[EAGLE-895] Improve alert engine metadata to organize by siteId https://issues.apache.org/jira/browse/EAGLE-895
Author: Hao Chen <[email protected]> Closes #801 from haoch/AddPolicySiteId. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/49ca3b0e Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/49ca3b0e Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/49ca3b0e Branch: refs/heads/master Commit: 49ca3b0ec481f6fcfbf339cb6d3b63b4dede1011 Parents: 7681287 Author: Hao Chen <[email protected]> Authored: Thu Feb 16 13:24:08 2017 +0800 Committer: Hao Chen <[email protected]> Committed: Thu Feb 16 13:24:08 2017 +0800 ---------------------------------------------------------------------- .../eagle/alert/app/AlertEagleStorePlugin.java | 16 +- .../app/AlertUnitTopologyAppProviderTest.java | 2 +- .../engine/coordinator/PolicyDefinition.java | 20 +- .../engine/coordinator/PublishmentType.java | 73 +- .../coordinator/PolicyDefinitionTest.java | 2 +- .../publisher/AlertPublishPluginProvider.java | 24 + .../publisher/PublishementTypeLoader.java | 58 ++ .../publisher/impl/AlertEagleStorePlugin.java | 12 +- .../publisher/impl/AlertEmailPublisher.java | 16 +- .../publisher/impl/AlertFilePublisher.java | 13 +- .../publisher/impl/AlertKafkaPublisher.java | 17 +- .../publisher/impl/AlertSlackPublisher.java | 17 +- .../publisher/PublishementTypeLoaderTest.java | 27 + .../metadata/resource/MetadataResource.java | 51 +- .../resource/StreamDefinitionWrapper.java | 72 ++ .../eagle/alert/metadata/IMetadataDao.java | 8 + .../environment/impl/StormExecutionRuntime.java | 3 +- .../ApplicationStatusUpdateServiceImpl.java | 73 +- .../eagle/app/spi/ApplicationProvider.java | 2 +- .../app/test/ApplicationSimulatorImpl.java | 3 +- .../eagle/app/test/ApplicationTestBase.java | 25 + .../app/resource/ApplicationResourceTest.java | 2 +- eagle-core/eagle-common/pom.xml | 4 + .../eagle/common/utils/ReflectionsHelper.java | 48 ++ .../eagle/service/hbase/EmbeddedHbase.java | 7 +- .../eagle/service/hbase/EmbeddedHbaseTest.java | 6 +- .../eagle/service/hbase/TestHBaseBase.java | 44 +- .../service/ApplicationStatusUpdateService.java | 2 - .../client/impl/EagleServiceClientImpl.java | 9 +- .../eagle/service/client/ClientTestBase.java | 2 +- .../eagle-query/eagle-entity-base/pom.xml | 28 +- .../entity/repo/EntityRepositoryScanner.java | 92 ++- .../TestGenericEntityIndexStreamReader.java | 14 +- .../eagle/log/entity/TestTestLogAPIEntity.java | 735 ++++++++++--------- .../repo/TestEntityRepositoryScanner.java | 5 +- .../eagle/storage/hbase/TestHBaseStatement.java | 17 +- .../storage/hbase/TestWithHBaseCoprocessor.java | 77 ++ .../coprocessor/TestGroupAggregateClient.java | 60 +- .../TestGroupAggregateTimeSeriesClient.java | 26 +- .../storage/hbase/spi/TestHBaseStorage.java | 22 +- ...estHBaseStorageAggregateWithCoprocessor.java | 36 +- .../hbase/spi/TestHBaseStorageLoader.java | 17 +- .../src/test/resources/log4j.properties | 2 +- .../example/ExampleApplicationProviderTest.java | 5 +- .../eagle/app/jpm/JPMWebApplicationTest.java | 1 + .../auditlog/TestHdfsAuditLogApplication.java | 7 +- .../apache/eagle/server/ServerApplication.java | 8 + pom.xml | 14 +- 48 files changed, 1190 insertions(+), 634 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java index 30d2b78..0b58bf7 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java +++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java @@ -19,7 +19,9 @@ package org.apache.eagle.alert.app; import com.google.common.base.Preconditions; import com.typesafe.config.Config; import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.coordinator.PublishmentType; import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider; import org.apache.eagle.alert.engine.publisher.impl.AbstractPublishPlugin; import org.apache.eagle.alert.utils.AlertConstants; import org.apache.eagle.metadata.model.AlertEntity; @@ -36,15 +38,14 @@ import java.util.Map; import static org.apache.eagle.alert.engine.model.AlertPublishEvent.*; -public class AlertEagleStorePlugin extends AbstractPublishPlugin { +public class AlertEagleStorePlugin extends AbstractPublishPlugin implements AlertPublishPluginProvider { private static final Logger LOG = LoggerFactory.getLogger(AlertEagleStorePlugin.class); private IEagleServiceClient client; @Override public void init(Config config, Publishment publishment, Map conf) throws Exception { super.init(config, publishment, conf); - client = new EagleServiceClientImpl(config.getString("service.host"), config.getInt("service.port"), - config.getString("service.username"), config.getString("service.password")); + client = new EagleServiceClientImpl(config); } @Override @@ -94,4 +95,13 @@ public class AlertEagleStorePlugin extends AbstractPublishPlugin { alertEvent.setTags(tags); return alertEvent; } + + @Override + public PublishmentType getPluginType() { + return new PublishmentType.Builder() + .name("HBaseStorage") + .type(getClass()) + .description("HBase Storage alert publisher") + .build(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProviderTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProviderTest.java b/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProviderTest.java index 927d505..4383484 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProviderTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProviderTest.java @@ -56,7 +56,7 @@ public class AlertUnitTopologyAppProviderTest extends ApplicationTestBase { statusUpdateService.updateApplicationEntityStatus(applicationEntity); // Stop application applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid())); - statusUpdateService.updateApplicationEntityStatus(applicationEntity); + awaitApplicationStop(applicationEntity); // Uninstall application applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid())); try { http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java index 7398dd5..c377e41 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java @@ -35,8 +35,9 @@ public class PolicyDefinition implements Serializable { @Length(min = 1, max = 50, message = "length should between 1 and 50") private String name; private String description; - private List<String> inputStreams = new ArrayList<String>(); - private List<String> outputStreams = new ArrayList<String>(); + private List<String> inputStreams = new ArrayList<>(); + private List<String> outputStreams = new ArrayList<>(); + private String siteId = "default"; private Definition definition; private Definition stateDefinition; @@ -137,6 +138,7 @@ public class PolicyDefinition implements Serializable { @Override public int hashCode() { return new HashCodeBuilder() + .append(siteId) .append(name) .append(inputStreams) .append(outputStreams) @@ -160,7 +162,8 @@ public class PolicyDefinition implements Serializable { PolicyDefinition another = (PolicyDefinition) that; - if (Objects.equals(another.name, this.name) + if (Objects.equals(another.siteId, this.siteId) + && Objects.equals(another.name, this.name) && Objects.equals(another.description, this.description) && CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams) && CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams) @@ -191,6 +194,14 @@ public class PolicyDefinition implements Serializable { return alertDefinition == null ? null : alertDefinition.getCategory(); } + public String getSiteId() { + return siteId; + } + + public void setSiteId(String siteId) { + this.siteId = siteId; + } + @JsonIgnoreProperties(ignoreUnknown = true) public static class Definition implements Serializable { private static final long serialVersionUID = -622366527887848346L; @@ -294,9 +305,8 @@ public class PolicyDefinition implements Serializable { ENABLED, DISABLED } - @Override public String toString() { - return String.format("{name=\"%s\",definition=%s}", this.getName(), this.getDefinition() == null ? "null" : this.getDefinition().toString()); + return String.format("{site=\"%s\", name=\"%s\",definition=%s}", this.getSiteId(), this.getName(), this.getDefinition() == null ? "null" : this.getDefinition().toString()); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java index 5bd15bc..f7025f2 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java @@ -21,16 +21,25 @@ package org.apache.eagle.alert.engine.coordinator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.commons.lang3.builder.HashCodeBuilder; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; @JsonIgnoreProperties(ignoreUnknown = true) public class PublishmentType { private String name; + + @Override + public String toString() { + return "PublishmentType{" + + "name='" + name + '\'' + + ", type='" + type + '\'' + + ", description='" + description + '\'' + + ", fields=" + fields + + '}'; + } + private String type; private String description; - private List<Map<String, String>> fields; + private List<Map<String, String>> fields = new LinkedList<>(); public String getName() { return name; @@ -64,6 +73,8 @@ public class PublishmentType { this.fields = fields; } + + @Override public boolean equals(Object obj) { if (obj instanceof PublishmentType) { @@ -85,4 +96,56 @@ public class PublishmentType { .append(fields) .build(); } -} + + + public static class Builder { + private final PublishmentType publishmentType; + + public Builder() { + this.publishmentType = new PublishmentType(); + } + + public Builder type(Class<?> typeClass) { + this.publishmentType.setType(typeClass.getName()); + return this; + } + + public Builder name(String name) { + this.publishmentType.setName(name); + return this; + } + + public Builder description(String description) { + this.publishmentType.setDescription(description); + return this; + } + + public Builder field(Map<String,String> fieldDesc) { + this.publishmentType.getFields().add(fieldDesc); + return this; + } + + public Builder field(String name, String value) { + this.publishmentType.getFields().add(new HashMap<String,String>() { + { + put("name", name); + put("value", value); + } + }); + return this; + } + + public Builder field(String name) { + this.publishmentType.getFields().add(new HashMap<String,String>() { + { + put("name", name); + } + }); + return this; + } + + public PublishmentType build() { + return this.publishmentType; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java index 7acb4f7..77b3517 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java @@ -76,7 +76,7 @@ public class PolicyDefinitionTest { sp.setColumns(Arrays.asList("host")); sp.setType(StreamPartition.Type.GROUPBY); pd.addPartition(sp); - Assert.assertEquals("{name=\"policyName\",definition={type=\"siddhi\",value=\"test\", inputStreams=\"[inputStream]\", outputStreams=\"[outputStream]\" }}", pd.toString()); + Assert.assertEquals("{site=\"default\", name=\"policyName\",definition={type=\"siddhi\",value=\"test\", inputStreams=\"[inputStream]\", outputStreams=\"[outputStream]\" }}", pd.toString()); PolicyDefinition pd1 = new PolicyDefinition(); PolicyDefinition.Definition def1 = new PolicyDefinition.Definition(); http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPluginProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPluginProvider.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPluginProvider.java new file mode 100644 index 0000000..77eea40 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPluginProvider.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.engine.publisher; + +import org.apache.eagle.alert.engine.coordinator.PublishmentType; + +public interface AlertPublishPluginProvider { + PublishmentType getPluginType(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoader.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoader.java new file mode 100644 index 0000000..820d70e --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoader.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.engine.publisher; + +import org.apache.eagle.alert.engine.coordinator.PublishmentType; +import org.apache.eagle.common.utils.ReflectionsHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; + +public class PublishementTypeLoader { + private static final Logger LOGGER = LoggerFactory.getLogger(PublishementTypeLoader.class); + + private final List<PublishmentType> publishmentTypeSet; + + private PublishementTypeLoader() { + this.publishmentTypeSet = new LinkedList<>(); + LOGGER.info("Loading alert publish plugins ..."); + for (Class<? extends AlertPublishPluginProvider> clazz: ReflectionsHelper.getInstance().getSubTypesOf(AlertPublishPluginProvider.class)) { + LOGGER.debug("Loading alert publish plugin: {}", clazz); + try { + PublishmentType type = clazz.newInstance().getPluginType(); + this.publishmentTypeSet.add(type); + LOGGER.info("Loaded alert publish plugin {}:{}", type.getName(), type.getType()); + } catch (InstantiationException | IllegalAccessException e) { + LOGGER.error("Failed to get instantiate alert publish plugin provider: {}", clazz, e); + } + } + LOGGER.info("Loaded {} alert publish plugins", this.publishmentTypeSet.size()); + } + + private static final PublishementTypeLoader INSTANCE = new PublishementTypeLoader(); + + public static List<PublishmentType> loadPublishmentTypes() { + return INSTANCE.getPublishmentTypes(); + } + + public List<PublishmentType> getPublishmentTypes() { + return publishmentTypeSet; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java index 48c3663..b410cda 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java @@ -20,8 +20,10 @@ package org.apache.eagle.alert.engine.publisher.impl; import com.typesafe.config.Config; import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.coordinator.PublishmentType; import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider; import org.apache.eagle.alert.engine.publisher.PublishConstants; import org.apache.eagle.alert.service.IMetadataServiceClient; import org.apache.eagle.alert.service.MetadataServiceClientImpl; @@ -34,7 +36,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -public class AlertEagleStorePlugin extends AbstractPublishPlugin { +public class AlertEagleStorePlugin extends AbstractPublishPlugin implements AlertPublishPluginProvider { private static Logger LOG = LoggerFactory.getLogger(AlertEagleStorePlugin.class); private transient IMetadataServiceClient client; @@ -72,4 +74,12 @@ public class AlertEagleStorePlugin extends AbstractPublishPlugin { return LOG; } + @Override + public PublishmentType getPluginType() { + return new PublishmentType.Builder() + .name("JDBCStorage") + .type(getClass()) + .description("Publish alerts into eagle metadata store") + .build(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java index d08d114..152a9f1 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java @@ -19,7 +19,9 @@ package org.apache.eagle.alert.engine.publisher.impl; import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.coordinator.PublishmentType; import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider; import org.apache.eagle.alert.engine.publisher.PublishConstants; import org.apache.eagle.alert.engine.publisher.email.AlertEmailConstants; import org.apache.eagle.alert.engine.publisher.email.AlertEmailGenerator; @@ -41,7 +43,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.eagle.alert.service.MetadataServiceClientImpl.*; import static org.apache.eagle.common.mail.AlertEmailConstants.*; -public class AlertEmailPublisher extends AbstractPublishPlugin { +public class AlertEmailPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider { private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisher.class); private static final int DEFAULT_THREAD_POOL_CORE_SIZE = 4; @@ -206,4 +208,16 @@ public class AlertEmailPublisher extends AbstractPublishPlugin { protected Logger getLogger() { return LOG; } + + @Override + public PublishmentType getPluginType() { + return new PublishmentType.Builder() + .name("Email") + .type(AlertEmailPublisher.class) + .description("Email alert publisher") + .field("subject") + .field("sender") + .field("recipients") + .build(); + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java index 1848979..375a0da 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java @@ -19,8 +19,10 @@ package org.apache.eagle.alert.engine.publisher.impl; import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.coordinator.PublishmentType; import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider; import org.apache.eagle.alert.engine.publisher.PublishConstants; import org.apache.eagle.common.DateTimeUtil; @@ -33,7 +35,7 @@ import java.util.List; import java.util.Map; import java.util.logging.*; -public class AlertFilePublisher extends AbstractPublishPlugin { +public class AlertFilePublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider { private Logger filelogger = Logger.getLogger(AlertFilePublisher.class.getName()); private FileHandler handler; @@ -67,6 +69,15 @@ public class AlertFilePublisher extends AbstractPublishPlugin { filelogger.setUseParentHandlers(false); } + @Override + public PublishmentType getPluginType() { + return new PublishmentType.Builder() + .name("File") + .type(AlertFilePublisher.class) + .description("Local log file publisher") + .build(); + } + class AlertFileFormatter extends Formatter { @Override http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java index e48f2eb..adac1aa 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java @@ -26,7 +26,9 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.coordinator.PublishmentType; import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider; import org.apache.eagle.alert.engine.publisher.PublishConstants; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; @@ -37,7 +39,7 @@ import org.slf4j.LoggerFactory; import com.typesafe.config.Config; -public class AlertKafkaPublisher extends AbstractPublishPlugin { +public class AlertKafkaPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider { private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPublisher.class); private static final long MAX_TIMEOUT_MS = 60000; @@ -181,4 +183,15 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin { protected Logger getLogger() { return LOG; } -} + + @Override + public PublishmentType getPluginType() { + return new PublishmentType.Builder() + .name("Kafka") + .type(getClass()) + .description("Kafka alert publisher") + .field("kafka_broker","localhost:9092") + .field("topic") + .build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java index 6ce6ed7..0d60246 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java @@ -25,8 +25,10 @@ import com.ullink.slack.simpleslackapi.SlackSession; import com.ullink.slack.simpleslackapi.impl.SlackSessionFactory; import org.apache.commons.lang.StringUtils; import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.coordinator.PublishmentType; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider; import org.apache.eagle.alert.engine.publisher.PublishConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +41,7 @@ import java.util.Map; /** * @since Sep 14, 2016. */ -public class AlertSlackPublisher extends AbstractPublishPlugin { +public class AlertSlackPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider { private static final Logger LOG = LoggerFactory.getLogger(AlertSlackPublisher.class); private SlackSession session; @@ -155,4 +157,17 @@ public class AlertSlackPublisher extends AbstractPublishPlugin { SlackChannel channel = session.findChannelByName(channelName); session.sendMessage(channel, message, attachment); } + + @Override + public PublishmentType getPluginType() { + return new PublishmentType.Builder() + .name("Slack") + .type(getClass()) + .description("Slack alert publisher") + .field("token") + .field("channels") + .field("severitys") + .field("urltemplate") + .build(); + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java new file mode 100644 index 0000000..3df5fc8 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.engine.publisher; + +import org.junit.Test; + +public class PublishementTypeLoaderTest { + @Test + public void testPublishmentTypeLoader() { + PublishementTypeLoader.loadPublishmentTypes(); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java index 617b4f0..2d30e85 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java @@ -16,6 +16,9 @@ */ package org.apache.eagle.service.metadata.resource; +import com.google.common.base.Preconditions; +import com.google.inject.Inject; +import org.apache.commons.lang3.StringUtils; import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; import org.apache.eagle.alert.coordination.model.ScheduleState; import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; @@ -25,19 +28,20 @@ import org.apache.eagle.alert.engine.interpreter.PolicyInterpreter; import org.apache.eagle.alert.engine.interpreter.PolicyParseResult; import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult; import org.apache.eagle.alert.engine.model.AlertPublishEvent; +import org.apache.eagle.alert.engine.publisher.PublishementTypeLoader; import org.apache.eagle.alert.metadata.IMetadataDao; import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory; import org.apache.eagle.alert.metadata.resource.Models; import org.apache.eagle.alert.metadata.resource.OpResult; - -import com.google.common.base.Preconditions; -import com.google.inject.Inject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; import javax.validation.Valid; import javax.ws.rs.*; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; /** * @since Apr 11, 2016. @@ -135,6 +139,27 @@ public class MetadataResource { return dao.createStream(stream); } + @Path("/streams/create") + @POST + public OpResult createStream(StreamDefinitionWrapper stream) { + Preconditions.checkNotNull(stream.getStreamDefinition(),"Stream definition is null"); + Preconditions.checkNotNull(stream.getStreamSource(),"Stream source is null"); + stream.validateAndEnsureDefault(); + OpResult createStreamResult = dao.createStream(stream.getStreamDefinition()); + OpResult createDataSourceResult = dao.addDataSource(stream.getStreamSource()); + // TODO: Check kafka topic exist or not. + if (createStreamResult.code == OpResult.SUCCESS + && createDataSourceResult.code == OpResult.SUCCESS) { + return OpResult.success("Successfully create stream " + + stream.getStreamDefinition().getStreamId() + + ", and datasource " + + stream.getStreamSource().getName()); + } else { + return OpResult.fail("Error: " + + StringUtils.join(new String[]{createDataSourceResult.message, createDataSourceResult.message},",")); + } + } + @Path("/streams/batch") @POST public List<OpResult> addStreams(List<StreamDefinition> streams) { @@ -201,8 +226,12 @@ public class MetadataResource { @Path("/policies") @GET - public List<PolicyDefinition> listPolicies() { - return dao.listPolicies(); + public List<PolicyDefinition> listPolicies(@QueryParam("siteId") String siteId) { + if (siteId != null) { + return dao.getPoliciesBySiteId(siteId); + } else { + return dao.listPolicies(); + } } @Path("/policies") @@ -281,7 +310,7 @@ public class MetadataResource { try { PolicyDefinition policyDefinition = getPolicyById(policyId); policyDefinition.setPolicyStatus(status); - OpResult updateResult = addPolicy(policyDefinition); + OpResult updateResult = addPolicy(policyDefinition); result.code = updateResult.code; if (result.code == OpResult.SUCCESS) { @@ -292,7 +321,7 @@ public class MetadataResource { LOG.error(result.message); } } catch (Exception e) { - LOG.error("Error: " + e.getMessage(),e); + LOG.error("Error: " + e.getMessage(), e); result.code = OpResult.FAILURE; result.message = e.getMessage(); } @@ -350,17 +379,19 @@ public class MetadataResource { @Path("/publishmentTypes") @GET public List<PublishmentType> listPublishmentType() { - return dao.listPublishmentType(); + return PublishementTypeLoader.loadPublishmentTypes(); } @Path("/publishmentTypes") @POST + @Deprecated public OpResult addPublishmentType(PublishmentType publishmentType) { return dao.addPublishmentType(publishmentType); } @Path("/publishmentTypes/batch") @POST + @Deprecated public List<OpResult> addPublishmentTypes(List<PublishmentType> publishmentTypes) { List<OpResult> results = new LinkedList<>(); for (PublishmentType pubType : publishmentTypes) { @@ -371,12 +402,14 @@ public class MetadataResource { @Path("/publishmentTypes/{name}") @DELETE + @Deprecated public OpResult removePublishmentType(@PathParam("name") String name) { return dao.removePublishmentType(name); } @Path("/publishmentTypes") @DELETE + @Deprecated public List<OpResult> removePublishmentTypes(List<String> pubTypes) { List<OpResult> results = new LinkedList<>(); for (String pubType : pubTypes) { http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/StreamDefinitionWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/StreamDefinitionWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/StreamDefinitionWrapper.java new file mode 100644 index 0000000..738c978 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/StreamDefinitionWrapper.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.metadata.resource; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; +import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; +import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector; + +import java.util.Properties; + +public class StreamDefinitionWrapper { + private Kafka2TupleMetadata streamSource; + private StreamDefinition streamDefinition; + + public Kafka2TupleMetadata getStreamSource() { + return streamSource; + } + + public void setStreamSource(Kafka2TupleMetadata streamSource) { + this.streamSource = streamSource; + } + + public StreamDefinition getStreamDefinition() { + return streamDefinition; + } + + public void setStreamDefinition(StreamDefinition streamDefinition) { + this.streamDefinition = streamDefinition; + } + + public void validateAndEnsureDefault() { + Preconditions.checkNotNull(streamSource); + Preconditions.checkNotNull(streamDefinition); + if (streamSource.getType() == null) { + streamSource.setType("KAFKA"); + } + String dataSourceName = (getStreamDefinition().getStreamId() + "_CUSTOMIZED").toUpperCase(); + getStreamDefinition().setDataSource(dataSourceName); + getStreamSource().setName(dataSourceName); + Tuple2StreamMetadata codec = new Tuple2StreamMetadata(); + codec.setTimestampColumn("timestamp"); + codec.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getName()); + Properties streamNameSelectorProp = new Properties(); + streamNameSelectorProp.put("userProvidedStreamName", streamSource.getName()); + codec.setStreamNameSelectorProp(streamNameSelectorProp); + if (StringUtils.isBlank(codec.getStreamNameSelectorCls())) { + codec.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getName()); + } + if (StringUtils.isBlank(codec.getTimestampFormat())) { + codec.setTimestampFormat(null); + } + this.streamSource.setCodec(codec); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java index 2dc7f51..2d2a90f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java @@ -75,10 +75,13 @@ public interface IMetadataDao extends Closeable { OpResult removePublishment(String pubId); + @Deprecated List<PublishmentType> listPublishmentType(); + @Deprecated OpResult addPublishmentType(PublishmentType publishmentType); + @Deprecated OpResult removePublishmentType(String pubType); List<AlertPublishEvent> listAlertPublishEvent(int size); @@ -190,4 +193,9 @@ public interface IMetadataDao extends Closeable { } return result; } + + default List<PolicyDefinition> getPoliciesBySiteId(String siteId) { + Preconditions.checkNotNull(siteId,"siteId"); + return listPolicies().stream().filter(pc -> pc.getSiteId().equals(siteId)).collect(Collectors.toList()); + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java index f61a291..2b4180d 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java @@ -186,9 +186,10 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment, } else if (topologySummary.get_status().equalsIgnoreCase("INACTIVE")) { status = ApplicationEntity.Status.STOPPED; } else if (topologySummary.get_status().equalsIgnoreCase("KILLED")) { - status = ApplicationEntity.Status.STOPPED; + status = ApplicationEntity.Status.STOPPING; } else { LOG.error("Unknown storm topology ({}) status: {}", topologySummary.get_status(),topologySummary.get_status()); + status = ApplicationEntity.Status.UNKNOWN; } } } http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationStatusUpdateServiceImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationStatusUpdateServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationStatusUpdateServiceImpl.java index 02c3a5e..b5bec1b 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationStatusUpdateServiceImpl.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationStatusUpdateServiceImpl.java @@ -71,51 +71,60 @@ public class ApplicationStatusUpdateServiceImpl extends ApplicationStatusUpdateS } @Override - public void updateApplicationEntityStatus(Collection<ApplicationEntity> applicationEntities) { - } - - @Override public void updateApplicationEntityStatus(ApplicationEntity applicationEntity) { String appUuid = applicationEntity.getUuid(); - ApplicationEntity.Status currentStatus = applicationEntity.getStatus(); + ApplicationEntity.Status preStatus = applicationEntity.getStatus(); try { - ApplicationEntity.Status topologyStatus = applicationManagementService.getStatus(new ApplicationOperations.CheckStatusOperation(appUuid)); - if (currentStatus == ApplicationEntity.Status.STARTING) { - if (topologyStatus == ApplicationEntity.Status.RUNNING) { - applicationEntityService.delete(applicationEntity); - applicationEntity.setStatus(ApplicationEntity.Status.RUNNING); - applicationEntityService.create(applicationEntity); + ApplicationEntity.Status currentStatus = applicationManagementService.getStatus(new ApplicationOperations.CheckStatusOperation(appUuid)); + if (preStatus == ApplicationEntity.Status.STARTING) { + if (currentStatus == ApplicationEntity.Status.RUNNING) { + // applicationEntityService.delete(applicationEntity); + // applicationEntity.setStatus(ApplicationEntity.Status.RUNNING); + // applicationEntityService.create(applicationEntity); + currentStatus = ApplicationEntity.Status.RUNNING; // handle the topology corruption case: - } else if (topologyStatus == ApplicationEntity.Status.REMOVED) { - applicationEntityService.delete(applicationEntity); - applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED); - applicationEntityService.create(applicationEntity); + } else if (currentStatus == ApplicationEntity.Status.REMOVED) { + // applicationEntityService.delete(applicationEntity); + // applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED); + // applicationEntityService.create(applicationEntity); + currentStatus = ApplicationEntity.Status.INITIALIZED; } - } else if (currentStatus == ApplicationEntity.Status.STOPPING) { - if (topologyStatus == ApplicationEntity.Status.REMOVED) { - applicationEntityService.delete(applicationEntity); - applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED); - applicationEntityService.create(applicationEntity); + } else if (preStatus == ApplicationEntity.Status.STOPPING) { + if (currentStatus == ApplicationEntity.Status.REMOVED) { + // applicationEntityService.delete(applicationEntity); + // applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED); + // applicationEntityService.create(applicationEntity); + currentStatus = ApplicationEntity.Status.INITIALIZED; } - } else if (currentStatus == ApplicationEntity.Status.RUNNING) { + } else if (preStatus == ApplicationEntity.Status.RUNNING) { // handle the topology corruption case: - if (topologyStatus == ApplicationEntity.Status.REMOVED) { - applicationEntityService.delete(applicationEntity); - applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED); - applicationEntityService.create(applicationEntity); + if (currentStatus == ApplicationEntity.Status.REMOVED) { + // applicationEntityService.delete(applicationEntity); + // applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED); + // applicationEntityService.create(applicationEntity); + currentStatus = ApplicationEntity.Status.INITIALIZED; } - } else if (currentStatus == ApplicationEntity.Status.INITIALIZED) { + } else if (preStatus == ApplicationEntity.Status.INITIALIZED) { //corner case: when Storm service go down, app status-> initialized, //then when storm server is up again, storm topology will be launched automatically->active - if (topologyStatus == ApplicationEntity.Status.RUNNING) { - applicationEntityService.delete(applicationEntity); - applicationEntity.setStatus(ApplicationEntity.Status.RUNNING); - applicationEntityService.create(applicationEntity); + if (currentStatus == ApplicationEntity.Status.RUNNING) { + // applicationEntityService.delete(applicationEntity); + // applicationEntity.setStatus(ApplicationEntity.Status.RUNNING); + // applicationEntityService.create(applicationEntity); + currentStatus = ApplicationEntity.Status.RUNNING; } } - // "STOPPED" is not used in Eagle, so just do nothing. - applicationEntity.setStatus(topologyStatus); + if (currentStatus == ApplicationEntity.Status.REMOVED) { + currentStatus = ApplicationEntity.Status.INITIALIZED; + } + + // "STOPPED" is not used in Eagle, so just do nothing. + if (preStatus != currentStatus) { + LOG.info("Application {} status changed from {} to {}", applicationEntity.getAppId(), preStatus, currentStatus); + } + applicationEntity.setStatus(currentStatus); + applicationEntityService.update(applicationEntity); } catch (RuntimeException e) { LOG.error(e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java index eff232a..0172498 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java @@ -30,7 +30,7 @@ import java.util.List; import java.util.Optional; /** - * Application Service KafkaStreamMessaging Interface. + * Application Service Provider Interface (SPI) * * @param <T> Application Type. */ http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java index 1b066ef..a5f5a73 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java @@ -79,7 +79,8 @@ public class ApplicationSimulatorImpl extends ApplicationSimulator { while (attempt < 10) { attempt++; statusUpdateService.updateApplicationEntityStatus(applicationEntity); - if (applicationEntity.getStatus() == ApplicationEntity.Status.STOPPED) { + if (applicationEntity.getStatus() == ApplicationEntity.Status.STOPPED + || applicationEntity.getStatus() == ApplicationEntity.Status.INITIALIZED) { break; } else { try { http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java index 6bc73fc..52b8e79 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java @@ -17,12 +17,20 @@ package org.apache.eagle.app.test; import com.google.inject.Guice; +import com.google.inject.Inject; import com.google.inject.Injector; +import org.apache.eagle.metadata.model.ApplicationEntity; +import org.apache.eagle.metadata.service.ApplicationStatusUpdateService; +import org.junit.Assert; import org.junit.Before; public class ApplicationTestBase { private Injector injector; + + @Inject + ApplicationStatusUpdateService statusUpdateService; + @Before public void setUp() { injector = Guice.createInjector(new ApplicationTestGuiceModule()); @@ -32,4 +40,21 @@ public class ApplicationTestBase { protected Injector injector() { return injector; } + + protected void awaitApplicationStop(ApplicationEntity applicationEntity) throws InterruptedException { + int attempt = 0; + while (attempt < 10) { + attempt ++; + if (applicationEntity.getStatus() == ApplicationEntity.Status.STOPPED + || applicationEntity.getStatus() == ApplicationEntity.Status.INITIALIZED) { + break; + } else { + statusUpdateService.updateApplicationEntityStatus(applicationEntity); + Thread.sleep(1000); + } + } + if (attempt > 10) { + Assert.fail("Failed to wait for application to STOPPED after 10 attempts"); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/resource/ApplicationResourceTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/resource/ApplicationResourceTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/resource/ApplicationResourceTest.java index 59925fd..6c68cd2 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/resource/ApplicationResourceTest.java +++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/resource/ApplicationResourceTest.java @@ -59,7 +59,7 @@ public class ApplicationResourceTest extends ApplicationTestBase { statusUpdateService.updateApplicationEntityStatus(applicationEntity); // Stop application applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid())); - statusUpdateService.updateApplicationEntityStatus(applicationEntity); + awaitApplicationStop(applicationEntity); // Uninstall application applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid())); try { http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-common/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-common/pom.xml b/eagle-core/eagle-common/pom.xml index 6ab250e..2b72f44 100644 --- a/eagle-core/eagle-common/pom.xml +++ b/eagle-core/eagle-common/pom.xml @@ -105,6 +105,10 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> + <dependency> + <groupId>org.reflections</groupId> + <artifactId>reflections</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/ReflectionsHelper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/ReflectionsHelper.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/ReflectionsHelper.java new file mode 100644 index 0000000..facf07a --- /dev/null +++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/ReflectionsHelper.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.common.utils; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.reflections.Reflections; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReflectionsHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(ReflectionsHelper.class); + private final Reflections reflections; + private static final String DEFAULT_PACKAGE = "org.apache.eagle"; + + private ReflectionsHelper() { + Config config = ConfigFactory.load(); + String[] packages; + if (config.hasPath("scanPackages")) { + packages = config.getString("scanPackages").split(","); + } else { + packages = new String[]{DEFAULT_PACKAGE}; + } + LOGGER.info("Scanning packages: {}", packages); + this.reflections = new Reflections(packages); + } + + private static ReflectionsHelper INSTANCE = new ReflectionsHelper(); + + public static Reflections getInstance() { + return INSTANCE.reflections; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java index 84661e9..0aeac2c 100644 --- a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java +++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java @@ -23,6 +23,9 @@ import org.apache.hadoop.hbase.MiniHBaseCluster; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + +@Deprecated public class EmbeddedHbase { private HBaseTestingUtility util; private MiniHBaseCluster hbaseCluster; @@ -58,7 +61,7 @@ public class EmbeddedHbase { return getInstance(null); } - private EmbeddedHbase() { + public EmbeddedHbase() { this(DEFAULT_PORT, DEFAULT_ZNODE); } @@ -115,7 +118,7 @@ public class EmbeddedHbase { public void createTable(String tableName, String cf) { try { util.createTable(tableName, cf); - } catch (Exception ex) { + } catch (IOException ex) { LOG.warn("Create table failed, probably table already existed, table name: " + tableName); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/EmbeddedHbaseTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/EmbeddedHbaseTest.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/EmbeddedHbaseTest.java index e65f062..ee9d32d 100644 --- a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/EmbeddedHbaseTest.java +++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/EmbeddedHbaseTest.java @@ -18,9 +18,11 @@ package org.apache.eagle.service.hbase; import org.junit.Test; +import java.io.IOException; + public class EmbeddedHbaseTest extends TestHBaseBase { @Test - public void testHBaseCreateTable() { - // hbase.createTable("test_hbase_table","f"); + public void testHBaseCreateTable() throws IOException { + hbase.createTable("test_hbase_table","f"); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java index 31af2a1..35c0a38 100644 --- a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java +++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java @@ -17,30 +17,48 @@ package org.apache.eagle.service.hbase; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.junit.AfterClass; -import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - -@Ignore public class TestHBaseBase { - protected static EmbeddedHbase hbase; + private static final Logger LOGGER = LoggerFactory.getLogger(TestHBaseBase.class); + protected static HBaseTestingUtility hbase; - @BeforeClass - public static void setUpHBase() { - hbase = EmbeddedHbase.getInstance(); + protected static String getZkZnodeParent() { + return "/hbase-test"; } - public static void setupHBaseWithConfig(Configuration config) { - Assert.assertTrue("HBase test mini cluster should not start", null == hbase); - hbase = EmbeddedHbase.getInstance(config); + @BeforeClass + public static void setUpHBase() { + Configuration configuration = HBaseConfiguration.create(); + configuration.set("zookeeper.znode.parent", getZkZnodeParent()); + configuration.setInt("hbase.master.info.port", -1);//avoid port clobbering + configuration.setInt("hbase.regionserver.info.port", -1);//avoid port clobbering + hbase = new HBaseTestingUtility(configuration); + try { + hbase.startMiniCluster(); + } catch (Exception e) { + LOGGER.error("Error to start hbase mini cluster: " + e.getMessage(), e); + throw new IllegalStateException(e); + } + System.setProperty("storage.hbase.autoCreateTable","false"); + System.setProperty("storage.hbase.zookeeperZnodeParent", getZkZnodeParent()); + System.setProperty("storage.hbase.zookeeperPropertyClientPort", String.valueOf(hbase.getZkCluster().getClientPort())); } @AfterClass public static void shutdownHBase() { - if (hbase != null) { - hbase.shutdown(); + try { + hbase.shutdownMiniCluster(); + } catch (Exception e) { + LOGGER.error("Error to shutdown mini hbase cluster: " + e.getMessage(),e); + } finally { + hbase = null; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/ApplicationStatusUpdateService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/ApplicationStatusUpdateService.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/ApplicationStatusUpdateService.java index 66772ac..d725614 100644 --- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/ApplicationStatusUpdateService.java +++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/ApplicationStatusUpdateService.java @@ -22,7 +22,5 @@ import org.apache.eagle.metadata.model.ApplicationEntity; import java.util.Collection; public abstract class ApplicationStatusUpdateService extends AbstractScheduledService { - public abstract void updateApplicationEntityStatus(Collection<ApplicationEntity> applicationEntities); - public abstract void updateApplicationEntityStatus(ApplicationEntity applicationEntity); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java index 0411b90..7c79b39 100644 --- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java +++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java @@ -19,6 +19,7 @@ package org.apache.eagle.service.client.impl; import com.sun.jersey.api.client.WebResource; import com.typesafe.config.Config; import com.typesafe.config.ConfigException; +import org.apache.commons.lang3.StringUtils; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; import org.apache.eagle.service.client.EagleServiceClientException; @@ -65,11 +66,13 @@ public class EagleServiceClientImpl extends EagleServiceBaseClient { try { return config.getInt(SERVICE_PORT_KEY); } catch (ConfigException.WrongType wrongType) { - return Integer.valueOf(config.getString(SERVICE_PORT_KEY)); + String portStr = config.getString(SERVICE_PORT_KEY); + if (StringUtils.isNotBlank(portStr)) { + return Integer.valueOf(portStr); + } } - } else { - return 9090; } + return 9090; } public EagleServiceClientImpl(String host, int port, String username, String password) { http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-query/eagle-client-base/src/test/java/org/apache/eagle/service/client/ClientTestBase.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-client-base/src/test/java/org/apache/eagle/service/client/ClientTestBase.java b/eagle-core/eagle-query/eagle-client-base/src/test/java/org/apache/eagle/service/client/ClientTestBase.java index ac16b93..adfd2e2 100644 --- a/eagle-core/eagle-query/eagle-client-base/src/test/java/org/apache/eagle/service/client/ClientTestBase.java +++ b/eagle-core/eagle-query/eagle-client-base/src/test/java/org/apache/eagle/service/client/ClientTestBase.java @@ -19,7 +19,7 @@ package org.apache.eagle.service.client; import org.apache.eagle.service.hbase.EmbeddedHbase; public class ClientTestBase { - + //protected static EmbeddedServer server; protected static EmbeddedHbase hbase; http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-query/eagle-entity-base/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/pom.xml b/eagle-core/eagle-query/eagle-entity-base/pom.xml index f887714..fd2300c 100755 --- a/eagle-core/eagle-query/eagle-entity-base/pom.xml +++ b/eagle-core/eagle-query/eagle-entity-base/pom.xml @@ -31,20 +31,20 @@ <dependencies> <!-- put extcos dependency at the top for using asm 4.0 jar !--> - <dependency> - <groupId>net.sf.extcos</groupId> - <artifactId>extcos</artifactId> - <exclusions> - <exclusion> - <groupId>org.ow2.asm</groupId> - <artifactId>asm-all</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.ow2.asm</groupId> - <artifactId>asm-all</artifactId> - </dependency> + <!--<dependency>--> + <!--<groupId>net.sf.extcos</groupId>--> + <!--<artifactId>extcos</artifactId>--> + <!--<exclusions>--> + <!--<exclusion>--> + <!--<groupId>org.ow2.asm</groupId>--> + <!--<artifactId>asm-all</artifactId>--> + <!--</exclusion>--> + <!--</exclusions>--> + <!--</dependency>--> + <!--<dependency>--> + <!--<groupId>org.ow2.asm</groupId>--> + <!--<artifactId>asm-all</artifactId>--> + <!--</dependency>--> <dependency> <groupId>org.apache.eagle</groupId> <artifactId>eagle-common</artifactId> http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java index 7065cbe..8ccee87 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java @@ -16,55 +16,67 @@ */ package org.apache.eagle.log.entity.repo; -import java.util.Collection; -import java.util.Map; -import java.util.Set; - +import org.apache.commons.lang3.time.StopWatch; +import org.apache.eagle.common.utils.ReflectionsHelper; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.meta.EntitySerDeser; -import net.sf.extcos.ComponentQuery; -import net.sf.extcos.ComponentScanner; - import org.apache.eagle.log.entity.meta.EntityDefinitionManager; +import org.apache.eagle.log.entity.meta.EntitySerDeser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; +import java.util.Map; + public final class EntityRepositoryScanner { - private static final Logger LOG = LoggerFactory.getLogger(EntityRepositoryScanner.class); + private static final Logger LOG = LoggerFactory.getLogger(EntityRepositoryScanner.class); - public static void scan() throws InstantiationException, IllegalAccessException { - // TODO currently extcos 0.3b doesn't support to search packages like "com.*.eagle.*", "org.*.eagle.*". However 0.4b depends on asm-all version 4.0, which is - // conflicted with jersey server 1.8. We should fix it later - LOG.info("Scanning all entity repositories with pattern \"org.apache.eagle.*\""); - final ComponentScanner scanner = new ComponentScanner(); - final Set<Class<?>> classes = scanner.getClasses(new EntityRepoScanQuery() ); - for (Class<?> entityClass : classes) { - LOG.info("Processing entity repository: " + entityClass.getName()); - if (EntityRepository.class.isAssignableFrom(entityClass)) { - EntityRepository repo = (EntityRepository)entityClass.newInstance(); - addRepo(repo); - } - } - } + // public static void scan() throws InstantiationException, IllegalAccessException { + // // TODO currently extcos 0.3b doesn't support to search packages like "com.*.eagle.*", "org.*.eagle.*". However 0.4b depends on asm-all version 4.0, which is + // // conflicted with jersey server 1.8. We should fix it later + // LOG.info("Scanning all entity repositories with pattern \"org.apache.eagle.*\""); + // final ComponentScanner scanner = new ComponentScanner(); + // final Set<Class<?>> classes = scanner.getClasses(new EntityRepoScanQuery() ); + // for (Class<?> entityClass : classes) { + // LOG.info("Processing entity repository: " + entityClass.getName()); + // if (EntityRepository.class.isAssignableFrom(entityClass)) { + // EntityRepository repo = (EntityRepository)entityClass.newInstance(); + // addRepo(repo); + // } + // } + // } - private static void addRepo(EntityRepository repo) { - final Map<Class<?>, EntitySerDeser<?>> serDeserMap = repo.getSerDeserMap(); - for (Map.Entry<Class<?>, EntitySerDeser<?>> entry : serDeserMap.entrySet()) { - EntityDefinitionManager.registerSerDeser(entry.getKey(), entry.getValue()); - } - final Collection<Class<? extends TaggedLogAPIEntity>> entityClasses = repo.getEntitySet(); - for (Class<? extends TaggedLogAPIEntity> clazz : entityClasses) { - EntityDefinitionManager.registerEntity(clazz); - } - } + public static void scan() throws IllegalAccessException, InstantiationException { + LOG.info("Scanning all entity repositories"); + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + for (Class<? extends EntityRepository> entityRepoClass : ReflectionsHelper.getInstance().getSubTypesOf(EntityRepository.class)) { + if (EntityRepository.class.isAssignableFrom(entityRepoClass)) { + EntityRepository repo = entityRepoClass.newInstance(); + addRepo(repo); + } + } + stopWatch.stop(); + LOG.info("Finished scanning entity repositories in {} ms", stopWatch.getTime()); + } - public static class EntityRepoScanQuery extends ComponentQuery { + private static void addRepo(EntityRepository repo) { + final Map<Class<?>, EntitySerDeser<?>> serDeserMap = repo.getSerDeserMap(); + for (Map.Entry<Class<?>, EntitySerDeser<?>> entry : serDeserMap.entrySet()) { + EntityDefinitionManager.registerSerDeser(entry.getKey(), entry.getValue()); + } + final Collection<Class<? extends TaggedLogAPIEntity>> entityClasses = repo.getEntitySet(); + for (Class<? extends TaggedLogAPIEntity> clazz : entityClasses) { + EntityDefinitionManager.registerEntity(clazz); + } + } - @Override - protected void query() { - select().from("org.apache.eagle").returning( - allExtending(EntityRepository.class)); - } - } + // public static class EntityRepoScanQuery extends ComponentQuery { + // + // @Override + // protected void query() { + // select().from("org.apache.eagle").returning( + // allExtending(EntityRepository.class)); + // } + // } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestGenericEntityIndexStreamReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestGenericEntityIndexStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestGenericEntityIndexStreamReader.java index 1e9e6cb..33aee32 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestGenericEntityIndexStreamReader.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestGenericEntityIndexStreamReader.java @@ -26,19 +26,24 @@ import org.apache.eagle.log.entity.test.TestLogAPIEntity; import org.apache.eagle.query.parser.EagleQueryParser; import org.apache.eagle.service.hbase.TestHBaseBase; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; public class TestGenericEntityIndexStreamReader extends TestHBaseBase { - @Test - public void testUniqueIndexRead() throws Exception { + @BeforeClass + public static void createTable() throws IOException, IllegalAccessException, InstantiationException { EntityDefinition entityDefinition = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class); hbase.createTable(entityDefinition.getTable(), entityDefinition.getColumnFamily()); + } + @Test + public void testUniqueIndexRead() throws Exception { EntityDefinitionManager.registerEntity(TestLogAPIEntity.class); final EntityDefinition ed = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class); @@ -95,7 +100,6 @@ public class TestGenericEntityIndexStreamReader extends TestHBaseBase { indexReader = new UniqueIndexStreamReader(indexDef, condition); batchReader = new GenericEntityBatchReader(indexReader); entities = batchReader.read(); - hbase.deleteTable(entityDefinition.getTable()); Assert.assertNotNull(entities); Assert.assertTrue(entities.isEmpty()); } @@ -103,7 +107,7 @@ public class TestGenericEntityIndexStreamReader extends TestHBaseBase { @Test public void testNonClusterIndexRead() throws Exception { EntityDefinition entityDefinition = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class); - hbase.createTable(entityDefinition.getTable(), entityDefinition.getColumnFamily()); + // hbase.createTable(entityDefinition.getTable(), entityDefinition.getColumnFamily()); EntityDefinitionManager.registerEntity(TestLogAPIEntity.class); final EntityDefinition ed = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class); @@ -162,7 +166,7 @@ public class TestGenericEntityIndexStreamReader extends TestHBaseBase { indexReader = new NonClusteredIndexStreamReader(indexDef, condition); batchReader = new GenericEntityBatchReader(indexReader); entities = batchReader.read(); - hbase.deleteTable(entityDefinition.getTable()); + // hbase.deleteTable(entityDefinition.getTable()); Assert.assertNotNull(entities); Assert.assertTrue(entities.isEmpty()); }
