Repository: incubator-eagle Updated Branches: refs/heads/master c69f94eff -> 021c2bddd
add ScheduleStateCleaner.java https://issues.apache.org/jira/browse/EAGLE-803 Author: Zhao, Qingwen <qingwz...@apache.org> Closes #694 from qingwen220/EAGLE-803. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/021c2bdd Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/021c2bdd Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/021c2bdd Branch: refs/heads/master Commit: 021c2bddddd98892442f43d0425c67effa192273 Parents: c69f94e Author: Zhao, Qingwen <qingwz...@apache.org> Authored: Tue Nov 29 15:14:04 2016 +0800 Committer: Zhao, Qingwen <qingwz...@apache.org> Committed: Tue Nov 29 15:14:04 2016 +0800 ---------------------------------------------------------------------- .../engine/coordinator/PublishmentType.java | 9 +- .../alert/service/IMetadataServiceClient.java | 2 + .../service/MetadataServiceClientImpl.java | 7 + .../engine/coordinator/PublishmentTypeTest.java | 3 +- .../eagle/alert/coordinator/Coordinator.java | 12 ++ .../trigger/ScheduleStateCleaner.java | 53 +++++++ .../mock/InMemMetadataServiceClient.java | 5 + .../integration/MockMetadataServiceClient.java | 5 + .../metadata/resource/MetadataResource.java | 10 +- .../alert-metadata/pom.xml | 4 + .../eagle/alert/metadata/IMetadataDao.java | 7 +- .../metadata/impl/InMemMetadataDaoImpl.java | 11 ++ .../metadata/impl/JdbcDatabaseHandler.java | 106 ++++++++++++-- .../metadata/impl/JdbcMetadataDaoImpl.java | 29 +++- .../alert/metadata/impl/JdbcSchemaManager.java | 1 + .../metadata/impl/MongoMetadataDaoImpl.java | 11 ++ .../eagle/alert/metadata/impl/InMemoryTest.java | 4 +- .../eagle/alert/metadata/impl/JdbcImplTest.java | 51 +++++-- .../src/test/resources/application-jdbc.conf | 25 ++++ .../src/test/resources/application-mysql.conf | 23 ---- .../alert-metadata/src/test/resources/init.sql | 70 ++++++++++ eagle-server-assembly/src/main/conf/eagle.conf | 138 ++++++++++--------- .../src/main/resources/application.conf | 2 + 23 files changed, 462 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java index 5329dfa..2718cfe 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java @@ -21,6 +21,8 @@ package org.apache.eagle.alert.engine.coordinator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.commons.lang3.builder.HashCodeBuilder; +import java.util.List; +import java.util.Map; import java.util.Objects; @JsonIgnoreProperties(ignoreUnknown = true) @@ -29,7 +31,8 @@ public class PublishmentType { private String type; private String className; private String description; - private String fields; + + private List<Map<String, String>> fields; public String getType() { return type; @@ -55,11 +58,11 @@ public class PublishmentType { this.description = description; } - public String getFields() { + public List<Map<String, String>> getFields() { return fields; } - public void setFields(String fields) { + public void setFields(List<Map<String, String>> fields) { this.fields = fields; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java index b00fc78..efa6d0e 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java @@ -86,6 +86,8 @@ public interface IMetadataServiceClient extends Closeable, Serializable { void clear(); + void clearScheduleState(int maxCapacity); + // for topology mgmt // for alert event http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java index 209a3a6..8571e56 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java @@ -69,6 +69,7 @@ public class MetadataServiceClientImpl implements IMetadataServiceClient { private static final String METADATA_ALERTS_BATCH_PATH = "/metadata/alerts/batch"; private static final String METADATA_CLEAR_PATH = "/metadata/clear"; + private static final String METADATA_CLEAR_SCHEDULESTATES_PATH = "/metadata/clear/schedulestates"; private static final String EAGLE_CORRELATION_CONTEXT = "metadataService.context"; public static final String EAGLE_CORRELATION_SERVICE_PORT = "metadataService.port"; @@ -279,6 +280,12 @@ public class MetadataServiceClientImpl implements IMetadataServiceClient { } @Override + public void clearScheduleState(int maxCapacity) { + WebResource r = client.resource(basePath + METADATA_CLEAR_SCHEDULESTATES_PATH); + r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(maxCapacity); + } + + @Override public List<AlertPublishEvent> listAlertPublishEvent() { return list(METADATA_ALERTS_PATH, new GenericType<List<AlertPublishEvent>>(){}); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java index 91f9cf8..957ac9a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java @@ -20,19 +20,18 @@ import org.junit.Assert; import org.junit.Test; public class PublishmentTypeTest { + @Test public void testPublishmentType() { PublishmentType publishmentType = new PublishmentType(); publishmentType.setType("KAFKA"); publishmentType.setClassName("setClassName"); publishmentType.setDescription("setDescription"); - publishmentType.setFields("setFields"); PublishmentType publishmentType1 = new PublishmentType(); publishmentType1.setType("KAFKA"); publishmentType1.setClassName("setClassName"); publishmentType1.setDescription("setDescription"); - publishmentType1.setFields("setFields"); Assert.assertFalse(publishmentType.equals(new String(""))); Assert.assertFalse(publishmentType == publishmentType1); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java index 2a0abce..cccf2e3 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java @@ -29,6 +29,7 @@ import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder; import org.apache.eagle.alert.coordinator.trigger.CoordinatorTrigger; import org.apache.eagle.alert.coordinator.trigger.DynamicPolicyLoader; import org.apache.eagle.alert.coordinator.trigger.PolicyChangeListener; +import org.apache.eagle.alert.coordinator.trigger.ScheduleStateCleaner; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.service.IMetadataServiceClient; import org.apache.eagle.alert.service.MetadataServiceClientImpl; @@ -73,6 +74,10 @@ public class Coordinator { private static final String METADATA_SERVICE_CONTEXT = "metadataService.context"; private static final String DYNAMIC_POLICY_LOADER_INIT_MILLS = "metadataDynamicCheck.initDelayMillis"; private static final String DYNAMIC_POLICY_LOADER_DELAY_MILLS = "metadataDynamicCheck.delayMillis"; + private static final String DYNAMIC_SCHEDULE_STATE_CLEAR_MIN = "metadataDynamicCheck.stateClearPeriodMin"; + private static final String DYNAMIC_SCHEDULE_STATE_RESERVE_CAPACITY = "metadataDynamicCheck.stateReservedCapacity"; + + private static final int DEFAULT_STATE_RESERVE_CAPACITY = 1000; public static final String GREEDY_SCHEDULER_ZK_PATH = "/alert/greedy/leader"; @@ -241,6 +246,13 @@ public class Coordinator { loader.addPolicyChangeListener(new PolicyChangeHandler(config, client)); scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS); + if (config.hasPath(DYNAMIC_SCHEDULE_STATE_CLEAR_MIN) && config.hasPath(DYNAMIC_SCHEDULE_STATE_RESERVE_CAPACITY)) { + int period = config.getInt(DYNAMIC_SCHEDULE_STATE_CLEAR_MIN); + int capacity = config.getInt(DYNAMIC_SCHEDULE_STATE_RESERVE_CAPACITY); + ScheduleStateCleaner cleaner = new ScheduleStateCleaner(client, capacity); + scheduleSrv.scheduleAtFixedRate(cleaner, period, period, TimeUnit.MINUTES); + } + Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv))); LOG.info("Eagle Coordinator started ..."); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java new file mode 100644 index 0000000..0229c20 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.coordinator.trigger; + +import com.google.common.base.Stopwatch; +import org.apache.eagle.alert.service.IMetadataServiceClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class ScheduleStateCleaner implements Runnable { + + private static Logger LOG = LoggerFactory.getLogger(ScheduleStateCleaner.class); + + private IMetadataServiceClient client; + private int reservedCapacity; + + public ScheduleStateCleaner(IMetadataServiceClient client, int capacity) { + this.client = client; + this.reservedCapacity = capacity; + } + + @Override + public void run() { + // we should catch every exception to avoid zombile thread + try { + final Stopwatch watch = Stopwatch.createStarted(); + LOG.info("clear schedule states start."); + client.clearScheduleState(reservedCapacity); + watch.stop(); + LOG.info("clear schedule states completed. used time milliseconds: {}", watch.elapsed(TimeUnit.MILLISECONDS)); + // reset cached policies + } catch (Throwable t) { + LOG.error("fail to clear schedule states due to {}, but continue to run", t.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java index ee7ca54..826cde4 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java @@ -184,6 +184,11 @@ public class InMemMetadataServiceClient implements IMetadataServiceClient { } @Override + public void clearScheduleState(int maxCapacity) { + + } + + @Override public List<AlertPublishEvent> listAlertPublishEvent() { return this.alerts; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java index 7f650c6..2d3ee85 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java @@ -154,6 +154,11 @@ public class MockMetadataServiceClient implements IMetadataServiceClient { } @Override + public void clearScheduleState(int maxCapacity) { + + } + + @Override public List<AlertPublishEvent> listAlertPublishEvent() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java index fc4a2bd..751853c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java @@ -31,6 +31,7 @@ import org.apache.eagle.alert.metadata.IMetadataDao; import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory; import org.apache.eagle.alert.metadata.resource.Models; import org.apache.eagle.alert.metadata.resource.OpResult; + import com.google.inject.Inject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +74,12 @@ public class MetadataResource { return dao.clear(); } + @Path("/clear/schedulestates") + @POST + public OpResult clearScheduleStates(int capacity) { + return dao.clearScheduleState(capacity); + } + @Path("/export") @POST public Models export() { @@ -291,7 +298,7 @@ public class MetadataResource { } } } else { - throw new IllegalArgumentException("Publishsment (name: " + publishmentId + ") not found"); + throw new IllegalArgumentException("Publishment (name: " + publishmentId + ") not found"); } } @@ -535,4 +542,5 @@ public class MetadataResource { return results; } + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml index ebe24e2..1711f0a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml @@ -44,6 +44,10 @@ <version>${mongodb.version}</version> </dependency> <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + </dependency> + <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql-connector-java.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java index 19d2b31..c5221c2 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java @@ -25,13 +25,12 @@ import org.apache.eagle.alert.engine.coordinator.*; import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.metadata.resource.Models; import org.apache.eagle.alert.metadata.resource.OpResult; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; -import java.util.HashMap; import java.util.List; -import java.util.Map; public interface IMetadataDao extends Closeable { @@ -90,8 +89,12 @@ public interface IMetadataDao extends Closeable { ScheduleState getScheduleState(); + List<ScheduleState> listScheduleStates(); + OpResult addScheduleState(ScheduleState state); + OpResult clearScheduleState(int maxCapacity); + List<PolicyAssignment> listAssignments(); OpResult addAssignment(PolicyAssignment assignment); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java index 611bbb4..adfe15a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java @@ -28,6 +28,7 @@ import org.apache.eagle.alert.metadata.IMetadataDao; import org.apache.eagle.alert.metadata.MetadataUtils; import org.apache.eagle.alert.metadata.resource.Models; import org.apache.eagle.alert.metadata.resource.OpResult; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -267,6 +268,16 @@ public class InMemMetadataDaoImpl implements IMetadataDao { } @Override + public List<ScheduleState> listScheduleStates() { + throw new UnsupportedOperationException("listScheduleStates not support!"); + } + + @Override + public OpResult clearScheduleState(int maxCapacity) { + throw new UnsupportedOperationException("clearScheduleState not support!"); + } + + @Override public List<PolicyAssignment> listAssignments() { return assignments; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java index 550eb00..933c02e 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java @@ -19,12 +19,20 @@ package org.apache.eagle.alert.metadata.impl; import org.apache.commons.dbcp.BasicDataSource; +import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; +import org.apache.eagle.alert.coordination.model.ScheduleState; +import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; +import org.apache.eagle.alert.coordination.model.internal.Topology; +import org.apache.eagle.alert.engine.coordinator.*; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.metadata.MetadataUtils; import org.apache.eagle.alert.metadata.resource.OpResult; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,23 +55,39 @@ public class JdbcDatabaseHandler { private static final String QUERY_CONDITION_STATEMENT = "SELECT value FROM %s WHERE id=?"; private static final String QUERY_ORDERBY_STATEMENT = "SELECT value FROM %s ORDER BY id %s"; private static final String QUERY_ALL_STATEMENT_WITH_SIZE = "SELECT value FROM %s limit %s"; + private static final String CLEAR_SCHEDULESTATES_STATEMENT = "DELETE FROM schedule_state WHERE id NOT IN (SELECT id from (SELECT id FROM schedule_state ORDER BY id DESC limit ?) as states)"; public enum SortType { DESC, ASC } - private Map<String, String> tblNameMap = new HashMap<>(); + private static Map<String, String> tblNameMap = new HashMap<>(); private static final ObjectMapper mapper = new ObjectMapper(); private DataSource dataSource; static { mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + registerTableName(StreamingCluster.class.getSimpleName(), "cluster"); + registerTableName(StreamDefinition.class.getSimpleName(), "stream_schema"); + registerTableName(Kafka2TupleMetadata.class.getSimpleName(), "datasource"); + registerTableName(PolicyDefinition.class.getSimpleName(), "policy"); + registerTableName(Publishment.class.getSimpleName(), "publishment"); + registerTableName(PublishmentType.class.getSimpleName(), "publishment_type"); + registerTableName(ScheduleState.class.getSimpleName(), "schedule_state"); + registerTableName(PolicyAssignment.class.getSimpleName(), "assignment"); + registerTableName(Topology.class.getSimpleName(), "topology"); + registerTableName(AlertPublishEvent.class.getSimpleName(), "alert_event"); + } + + private static void registerTableName(String clzName, String tblName) { + tblNameMap.put(clzName, tblName); } public JdbcDatabaseHandler(Config config) { // "jdbc:mysql://dbhost/database?" + "user=sqluser&password=sqluserpw" - this.tblNameMap = JdbcSchemaManager.tblNameMap; + //this.tblNameMap = JdbcSchemaManager.tblNameMap; try { - JdbcSchemaManager.getInstance().init(config); + //JdbcSchemaManager.getInstance().init(config); BasicDataSource bDatasource = new BasicDataSource(); bDatasource.setDriverClassName(config.getString(MetadataUtils.JDBC_DRIVER_PATH)); if (config.hasPath(MetadataUtils.JDBC_USERNAME_PATH)) { @@ -115,7 +139,7 @@ public class JdbcDatabaseHandler { connection.commit(); } catch (SQLException e) { LOG.error(e.getMessage(), e.getCause()); - if (e.getMessage().toLowerCase().contains("duplicate") && connection != null) { + if (connection != null) { LOG.info("Detected duplicated entity"); try { connection.rollback(savepoint); @@ -193,15 +217,10 @@ public class JdbcDatabaseHandler { return executeSelectStatement(clz, query); } - public <T> T listTop(Class<T> clz, String sortType) { + public <T> List<T> listOrderBy(Class<T> clz, String sortType) { String tb = getTableName(clz.getSimpleName()); String query = String.format(QUERY_ORDERBY_STATEMENT, tb, sortType); - List<T> result = executeSelectStatement(clz, query); - if (result.isEmpty()) { - return null; - } else { - return result.get(0); - } + return executeSelectStatement(clz, query); } public <T> T listWithFilter(String key, Class<T> clz) { @@ -287,7 +306,7 @@ public class JdbcDatabaseHandler { Connection connection = null; try { connection = dataSource.getConnection(); - PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb, key)); + PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb)); statement.setString(1, key); int status = statement.executeUpdate(); String msg = String.format("delete %s entities from table %s", status, tb); @@ -314,4 +333,67 @@ public class JdbcDatabaseHandler { //JdbcSchemaManager.getInstance().shutdown(); } + public OpResult removeBatch(String clzName, List<String> keys) { + String tb = getTableName(clzName); + OpResult result = new OpResult(); + Connection connection = null; + try { + connection = dataSource.getConnection(); + connection.setAutoCommit(false); + PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb)); + for (String key : keys) { + statement.setString(1, key); + statement.addBatch(); + } + int[] num = statement.executeBatch(); + connection.commit(); + int sum = 0; + for (int i : num) { + sum += i; + } + String msg = String.format("delete %s records from table %s", sum, tb); + result.code = OpResult.SUCCESS; + result.message = msg; + statement.close(); + } catch (SQLException e) { + result.code = OpResult.FAILURE; + result.message = e.getMessage(); + LOG.error(e.getMessage(), e); + } finally { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause()); + } + } + } + return result; + } + + public OpResult removeScheduleStates(int capacity) { + OpResult result = new OpResult(); + Connection connection = null; + try { + connection = dataSource.getConnection(); + PreparedStatement statement = connection.prepareStatement(CLEAR_SCHEDULESTATES_STATEMENT); + statement.setInt(1, capacity); + result.message = String.format("delete %d records from schedule_state", statement.executeUpdate()); + result.code = OpResult.SUCCESS; + statement.close(); + } catch (SQLException e) { + result.code = OpResult.FAILURE; + result.message = e.getMessage(); + LOG.error(e.getMessage(), e); + } finally { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause()); + } + } + } + return result; + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java index 22435fe..384eddc 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java @@ -26,10 +26,14 @@ import org.apache.eagle.alert.metadata.IMetadataDao; import org.apache.eagle.alert.metadata.MetadataUtils; import org.apache.eagle.alert.metadata.resource.Models; import org.apache.eagle.alert.metadata.resource.OpResult; + import com.google.inject.Inject; import com.typesafe.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -37,6 +41,7 @@ import java.util.stream.Collectors; * @since May 26, 2016. */ public class JdbcMetadataDaoImpl implements IMetadataDao { + private static final Logger LOG = LoggerFactory.getLogger(JdbcMetadataDaoImpl.class); private JdbcDatabaseHandler handler; @Inject @@ -101,12 +106,22 @@ public class JdbcMetadataDaoImpl implements IMetadataDao { @Override public ScheduleState getScheduleState(String versionId) { return handler.listWithFilter(versionId, ScheduleState.class); - //return null; } @Override public ScheduleState getScheduleState() { - return handler.listTop(ScheduleState.class, JdbcDatabaseHandler.SortType.DESC.toString()); + List<ScheduleState> scheduleStates = + handler.listOrderBy(ScheduleState.class, JdbcDatabaseHandler.SortType.DESC.toString()); + if (scheduleStates.isEmpty()) { + return null; + } else { + return scheduleStates.get(0); + } + } + + @Override + public List<ScheduleState> listScheduleStates() { + return handler.list(ScheduleState.class); } @Override @@ -160,6 +175,16 @@ public class JdbcMetadataDaoImpl implements IMetadataDao { } @Override + public OpResult clearScheduleState(int maxCapacity) { + if (maxCapacity <= 0) { + maxCapacity = 10; + } + OpResult result = handler.removeScheduleStates(maxCapacity); + LOG.info(result.message); + return result; + } + + @Override public OpResult addAssignment(PolicyAssignment assignment) { return handler.addOrReplace(PolicyAssignment.class.getSimpleName(), assignment); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java index 4568726..a02c51e 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java @@ -41,6 +41,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +@Deprecated public class JdbcSchemaManager { private static final Logger LOG = LoggerFactory.getLogger(JdbcSchemaManager.class); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java index af0494e..d639bff 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java @@ -43,6 +43,7 @@ import org.apache.eagle.alert.metadata.IMetadataDao; import org.apache.eagle.alert.metadata.MetadataUtils; import org.apache.eagle.alert.metadata.resource.Models; import org.apache.eagle.alert.metadata.resource.OpResult; + import org.bson.BsonDocument; import org.bson.BsonInt32; import org.bson.BsonString; @@ -501,6 +502,16 @@ public class MongoMetadataDaoImpl implements IMetadataDao { return state; } + @Override + public List<ScheduleState> listScheduleStates() { + throw new UnsupportedOperationException("listScheduleStates not support!"); + } + + @Override + public OpResult clearScheduleState(int maxCapacity) { + throw new UnsupportedOperationException("clearScheduleState not support!"); + } + private ScheduleState addDetailForScheduleState(ScheduleState state, String version) { Map<String, SpoutSpec> spoutMaps = maps(spoutSpecs, SpoutSpec.class, version); if (spoutMaps.size() != 0) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java index 7655f54..f45fd12 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java @@ -16,7 +16,6 @@ */ package org.apache.eagle.alert.metadata.impl; -import com.google.common.collect.Lists; import com.typesafe.config.ConfigFactory; import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; import org.apache.eagle.alert.coordination.model.ScheduleState; @@ -25,12 +24,11 @@ import org.apache.eagle.alert.engine.coordinator.StreamingCluster; import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.metadata.IMetadataDao; import org.apache.eagle.alert.metadata.resource.OpResult; + import org.junit.Assert; import org.junit.Test; import org.slf4j.LoggerFactory; -import java.util.List; - /** * @since May 1, 2016 */ http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java index 7a2fcb5..9d188c4 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java @@ -20,6 +20,7 @@ package org.apache.eagle.alert.metadata.impl; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.commons.collections.CollectionUtils; import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; import org.apache.eagle.alert.coordination.model.ScheduleState; import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; @@ -29,15 +30,14 @@ import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.coordinator.PublishmentType; import org.apache.eagle.alert.engine.coordinator.StreamingCluster; import org.apache.eagle.alert.metadata.IMetadataDao; -import org.apache.eagle.alert.metadata.MetadataUtils; + import org.apache.eagle.alert.metadata.resource.OpResult; import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Date; -import java.util.List; +import java.util.*; public class JdbcImplTest { private static Logger LOG = LoggerFactory.getLogger(JdbcImplTest.class); @@ -45,10 +45,9 @@ public class JdbcImplTest { @BeforeClass public static void setup() { - System.setProperty("config.resource", "/application-mysql.conf"); ConfigFactory.invalidateCaches(); - Config config = ConfigFactory.load(); - dao = new JdbcMetadataDaoImpl(config.getConfig(MetadataUtils.META_DATA)); + Config config = ConfigFactory.load("application-jdbc.conf"); + dao = new JdbcMetadataDaoImpl(config); } @AfterClass @@ -64,7 +63,6 @@ public class JdbcImplTest { private String TOPO_NAME = "topoName"; - @Ignore @Test public void test_apis() { // publishment @@ -109,6 +107,8 @@ public class JdbcImplTest { Assert.assertEquals(200, result.code); List<StreamingCluster> assigns = dao.listClusters(); Assert.assertEquals(1, assigns.size()); + dao.removeCluster("dd"); + Assert.assertEquals(0, dao.listClusters().size()); } // data source { @@ -133,10 +133,20 @@ public class JdbcImplTest { { PublishmentType publishmentType = new PublishmentType(); publishmentType.setType("KAFKA"); + List<Map<String, String>> fields = new ArrayList<>(); + Map<String, String> field1 = new HashMap<>(); + field1.put("name", "kafka_broker"); + field1.put("value", "sandbox.hortonworks.com:6667"); + Map<String, String> field2 = new HashMap<>(); + field2.put("name", "topic"); + fields.add(field1); + fields.add(field2); + publishmentType.setFields(fields); OpResult result = dao.addPublishmentType(publishmentType); Assert.assertEquals(200, result.code); - List<PublishmentType> assigns = dao.listPublishmentType(); - Assert.assertEquals(1, assigns.size()); + List<PublishmentType> types = dao.listPublishmentType(); + Assert.assertEquals(1, types.size()); + Assert.assertEquals(2, types.get(0).getFields().size()); } } @@ -151,7 +161,6 @@ public class JdbcImplTest { Assert.assertEquals(state.getVersion(), versionId); } - @Ignore @Test public void test_readCurrentState() { test_addstate(); @@ -161,4 +170,26 @@ public class JdbcImplTest { LOG.debug(state.getVersion()); LOG.debug(state.getGenerateTime()); } + + @Test + public void test_clearScheduleState() { + int maxCapacity = 4; + List<String> reservedOnes = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + ScheduleState state = new ScheduleState(); + String versionId = "state-" + System.currentTimeMillis(); + state.setVersion(versionId); + state.setGenerateTime(String.valueOf(new Date().getTime())); + dao.addScheduleState(state); + if (i >= 10 - maxCapacity) { + reservedOnes.add(versionId); + } + } + dao.clearScheduleState(maxCapacity); + List<ScheduleState> scheduleStates = dao.listScheduleStates(); + Assert.assertTrue(scheduleStates.size() == maxCapacity); + List<String> TargetVersions = new ArrayList<>(); + scheduleStates.stream().forEach(state -> TargetVersions.add(state.getVersion())); + Assert.assertTrue(CollectionUtils.isEqualCollection(reservedOnes, TargetVersions)); + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-jdbc.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-jdbc.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-jdbc.conf new file mode 100644 index 0000000..9c71a28 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-jdbc.conf @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +metadata { + metadataDao = org.apache.eagle.alert.metadata.impl.JdbcMetadataDaoImpl + jdbc { + username = null + password = null + driverClassName = "org.h2.Driver" + connection = "jdbc:h2:mem:test;INIT=RUNSCRIPT FROM './src/test/resources/init.sql'" + connectionProperties = "encoding=UTF8;timeout=60" + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mysql.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mysql.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mysql.conf deleted file mode 100644 index 2a1aa2c..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mysql.conf +++ /dev/null @@ -1,23 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -metadata { - metadataDao = org.apache.eagle.alert.metadata.impl.JdbcMetadataDaoImpl - jdbc { - connection = "jdbc:mysql://localhost:3306/alert_metadata?user=root&password=&createDatabaseIfNotExist=true" - database = "alert_metadata" - driverClassName = com.mysql.jdbc.Driver - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/init.sql ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/init.sql b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/init.sql new file mode 100644 index 0000000..90e9515 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/init.sql @@ -0,0 +1,70 @@ +-- /* +-- * Licensed to the Apache Software Foundation (ASF) under one or more +-- * contributor license agreements. See the NOTICE file distributed with +-- * this work for additional information regarding copyright ownership. +-- * The ASF licenses this file to You under the Apache License, Version 2.0 +-- * (the "License"); you may not use this file except in compliance with +-- * the License. You may obtain a copy of the License at +-- * +-- * http://www.apache.org/licenses/LICENSE-2.0 +-- * +-- * Unless required by applicable law or agreed to in writing, software +-- * distributed under the License is distributed on an "AS IS" BASIS, +-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- * See the License for the specific language governing permissions and +-- * limitations under the License. +-- * +-- */ + +CREATE TABLE IF NOT EXISTS cluster ( + id VARCHAR(50) PRIMARY KEY, + value longtext +); + + +CREATE TABLE IF NOT EXISTS stream_schema ( + id VARCHAR(50) PRIMARY KEY, + value longtext +); + +CREATE TABLE IF NOT EXISTS datasource ( + id VARCHAR(50) PRIMARY KEY, + value longtext +); + +CREATE TABLE IF NOT EXISTS policy ( + id VARCHAR(50) PRIMARY KEY, + value longtext +); + +CREATE TABLE IF NOT EXISTS publishment ( + id VARCHAR(50) PRIMARY KEY, + value longtext +); + + +CREATE TABLE IF NOT EXISTS publishment_type ( + id VARCHAR(50) PRIMARY KEY, + value longtext +); + + +CREATE TABLE IF NOT EXISTS schedule_state ( + id VARCHAR(50) PRIMARY KEY, + value longtext +); + +CREATE TABLE IF NOT EXISTS assignment ( + id VARCHAR(50) PRIMARY KEY, + value longtext +); + +CREATE TABLE IF NOT EXISTS topology ( + id VARCHAR(50) PRIMARY KEY, + value longtext +); + +CREATE TABLE IF NOT EXISTS alert_event ( + id VARCHAR(50) PRIMARY KEY, + value longtext +); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-server-assembly/src/main/conf/eagle.conf ---------------------------------------------------------------------- diff --git a/eagle-server-assembly/src/main/conf/eagle.conf b/eagle-server-assembly/src/main/conf/eagle.conf index 9c804a6..5f6c240 100644 --- a/eagle-server-assembly/src/main/conf/eagle.conf +++ b/eagle-server-assembly/src/main/conf/eagle.conf @@ -17,21 +17,21 @@ # Eagle REST Web Service Configuration # --------------------------------------------- service { - env = "testing" - host = "localhost" - port = 9090 - username = "admin" - password = "secret" - readTimeOutSeconds = 60 - context = "/rest" - timezone = "UTC" + env = "testing" + host = "localhost" + port = 9090 + username = "admin" + password = "secret" + readTimeOutSeconds = 60 + context = "/rest" + timezone = "UTC" } zookeeper { - zkQuorum = "localhost:2181" - zkSessionTimeoutMs : 15000 - zkRetryTimes : 3 - zkRetryInterval : 20000 + zkQuorum = "localhost:2181" + zkSessionTimeoutMs : 15000 + zkRetryTimes : 3 + zkRetryInterval : 20000 } # --------------------------------------------- @@ -39,57 +39,57 @@ zookeeper { # --------------------------------------------- storage { - # storage type: ["hbase","jdbc"] - # default is "hbase" - type = "hbase" + # storage type: ["hbase","jdbc"] + # default is "hbase" + type = "hbase" - hbase { - # hbase configuration: hbase.zookeeper.quorum - # default is "localhost" - zookeeperQuorum = "localhost" + hbase { + # hbase configuration: hbase.zookeeper.quorum + # default is "localhost" + zookeeperQuorum = "localhost" - # hbase configuration: hbase.zookeeper.property.clientPort - # default is 2181 - zookeeperPropertyClientPort = 2181 + # hbase configuration: hbase.zookeeper.property.clientPort + # default is 2181 + zookeeperPropertyClientPort = 2181 - # hbase configuration: zookeeper.znode.parent - # default is "/hbase" - zookeeperZnodeParent = "/hbase-unsecure" + # hbase configuration: zookeeper.znode.parent + # default is "/hbase" + zookeeperZnodeParent = "/hbase-unsecure" - # eagle web login profile: [sandbox, default] - # default is sandbox - tableNamePrefixedWithEnvironment = false + # eagle web login profile: [sandbox, default] + # default is sandbox + tableNamePrefixedWithEnvironment = false - # eagle coprocessor enabled or not: [true, false] - # default is false - coprocessorEnabled = false - } + # eagle coprocessor enabled or not: [true, false] + # default is false + coprocessorEnabled = false + } } # --------------------------------------------- # Eagle Metadata Store Configuration # --------------------------------------------- metadata { - store = org.apache.eagle.metadata.service.memory.MemoryMetadataStore - jdbc { - username = "root" - password = "" - driverClassName = com.mysql.jdbc.Driver - url = "jdbc:mysql://server.eagle.apache.org:3306/eagle" - } + store = org.apache.eagle.metadata.service.memory.MemoryMetadataStore + jdbc { + username = "root" + password = "" + driverClassName = com.mysql.jdbc.Driver + url = "jdbc:mysql://server.eagle.apache.org:3306/eagle" + } } # --------------------------------------------- # Eagle Application Configuration # --------------------------------------------- application { - sink { - type = org.apache.eagle.app.sink.KafkaStreamSink - } - storm { - nimbusHost = "server.eagle.apache.org" - nimbusThriftPort = 6627 - } + sink { + type = org.apache.eagle.app.sink.KafkaStreamSink + } + storm { + nimbusHost = "server.eagle.apache.org" + nimbusThriftPort = 6627 + } } # --------------------------------------------- @@ -98,27 +98,29 @@ application { # Coordinator Configuration coordinator { - policiesPerBolt = 5 - boltParallelism = 5 - policyDefaultParallelism = 5 - boltLoadUpbound = 0.8 - topologyLoadUpbound = 0.8 - numOfAlertBoltsPerTopology = 5 - zkConfig { - zkQuorum = "server.eagle.apache.org:2181" - zkRoot = "/alert" - zkSessionTimeoutMs = 10000 - connectionTimeoutMs = 10000 - zkRetryTimes = 3 - zkRetryInterval = 3000 - } - metadataService { - host = "localhost", - port = 9090, - context = "/rest" - } - metadataDynamicCheck { - initDelayMillis = 1000 - delayMillis = 30000 + policiesPerBolt = 5 + boltParallelism = 5 + policyDefaultParallelism = 5 + boltLoadUpbound = 0.8 + topologyLoadUpbound = 0.8 + numOfAlertBoltsPerTopology = 5 + zkConfig { + zkQuorum = "server.eagle.apache.org:2181" + zkRoot = "/alert" + zkSessionTimeoutMs = 10000 + connectionTimeoutMs = 10000 + zkRetryTimes = 3 + zkRetryInterval = 3000 + } + metadataService { + host = "localhost", + port = 9090, + context = "/rest" + } + metadataDynamicCheck { + initDelayMillis = 1000 + delayMillis = 30000 + stateClearPeriodMin = 1440 + stateReservedCapacity = 100 } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-server/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-server/src/main/resources/application.conf b/eagle-server/src/main/resources/application.conf index 91757d7..ce68550 100644 --- a/eagle-server/src/main/resources/application.conf +++ b/eagle-server/src/main/resources/application.conf @@ -139,5 +139,7 @@ coordinator { metadataDynamicCheck { initDelayMillis = 1000 delayMillis = 30000 + stateClearPeriodMin = 1440 + stateReservedCapacity = 100 } }