Repository: eagle Updated Branches: refs/heads/master 3011fdf84 -> 75586658b
[EAGLE-926] Alert engine fails to support more than one stream consumption from an application https://issues.apache.org/jira/browse/EAGLE-926 Author: Zhao, Qingwen <[email protected]> Closes #841 from qingwen220/EAGLE-926. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/75586658 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/75586658 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/75586658 Branch: refs/heads/master Commit: 75586658ba1aa202ee0ebec8c4cc52546cb0aae2 Parents: 3011fdf Author: Zhao, Qingwen <[email protected]> Authored: Fri Feb 24 11:50:35 2017 +0800 Committer: Zhao, Qingwen <[email protected]> Committed: Fri Feb 24 11:50:35 2017 +0800 ---------------------------------------------------------------------- .../engine/coordinator/StreamDefinition.java | 44 +++++++++++++------- .../alert/engine/runner/AlertPublisherBolt.java | 2 +- .../eagle/app/service/ApplicationAction.java | 7 ++-- 3 files changed, 34 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/75586658/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java index 9512f1a..fd5d5a6 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java @@ -52,6 +52,9 @@ public class StreamDefinition implements Serializable { // Stream data source ID private String dataSource; + // + private String streamSource; + // Tenant (Site) ID private String siteId; @@ -70,14 +73,15 @@ public class StreamDefinition implements Serializable { @Override public int hashCode() { return new HashCodeBuilder() - .append(this.streamId) - .append(this.description) - .append(this.validate) - .append(this.timeseries) - .append(this.dataSource) - .append(this.siteId) - .append(this.columns) - .build(); + .append(this.streamId) + .append(this.description) + .append(this.validate) + .append(this.timeseries) + .append(this.dataSource) + .append(streamSource) + .append(this.siteId) + .append(this.columns) + .build(); } @Override @@ -88,13 +92,15 @@ public class StreamDefinition implements Serializable { if (!(obj instanceof StreamDefinition)) { return false; } - return Objects.equals(this.streamId, ((StreamDefinition) obj).streamId) - && Objects.equals(this.description, ((StreamDefinition) obj).description) - && Objects.equals(this.validate, ((StreamDefinition) obj).validate) - && Objects.equals(this.timeseries, ((StreamDefinition) obj).timeseries) - && Objects.equals(this.dataSource, ((StreamDefinition) obj).dataSource) - && Objects.equals(this.siteId, ((StreamDefinition) obj).siteId) - && CollectionUtils.isEqualCollection(this.columns, ((StreamDefinition) obj).columns); + StreamDefinition streamDefinition = (StreamDefinition) obj; + return Objects.equals(this.streamId, streamDefinition.streamId) + && Objects.equals(this.description, streamDefinition.description) + && Objects.equals(this.validate, streamDefinition.validate) + && Objects.equals(this.timeseries, streamDefinition.timeseries) + && Objects.equals(this.dataSource, streamDefinition.dataSource) + && Objects.equals(this.streamSource, streamDefinition.streamSource) + && Objects.equals(this.siteId, streamDefinition.siteId) + && CollectionUtils.isEqualCollection(this.columns, streamDefinition.columns); } public String getStreamId() { @@ -166,6 +172,14 @@ public class StreamDefinition implements Serializable { this.siteId = siteId; } + public String getStreamSource() { + return streamSource; + } + + public void setStreamSource(String streamSource) { + this.streamSource = streamSource; + } + public StreamDefinition copy() { StreamDefinition copied = new StreamDefinition(); copied.setColumns(this.getColumns()); http://git-wip-us.apache.org/repos/asf/eagle/blob/75586658/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java index 2b57e96..44a5fe9 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java @@ -189,7 +189,7 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli StreamDefinition sd = alertPublisherBolt.streamDefinitionMap.get(inputStreamId); if (sd != null) { extraData.put(AlertPublishEvent.SITE_ID_KEY, sd.getSiteId()); - appIds.add(sd.getDataSource()); + appIds.add(sd.getStreamSource()); } } extraData.put(AlertPublishEvent.APP_IDS_KEY, appIds); http://git-wip-us.apache.org/repos/asf/eagle/blob/75586658/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java index a502f81..3733e4d 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java @@ -142,7 +142,7 @@ public class ApplicationAction implements Serializable { KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSinkConfig(); Kafka2TupleMetadata datasource = new Kafka2TupleMetadata(); datasource.setType("KAFKA"); - datasource.setName(metadata.getAppId()); + datasource.setName(streamDesc.getStreamId()); datasource.setTopic(kafkaCfg.getTopicId()); datasource.setSchemeCls(JsonScheme.class.getCanonicalName()); datasource.setProperties(new HashMap<>()); @@ -164,7 +164,8 @@ public class ApplicationAction implements Serializable { alertMetadataService.addDataSource(datasource); StreamDefinition sd = streamDesc.getSchema(); - sd.setDataSource(metadata.getAppId()); + sd.setDataSource(streamDesc.getStreamId()); + sd.setStreamSource(metadata.getAppId()); alertMetadataService.createStream(streamDesc.getSchema()); } } @@ -177,7 +178,7 @@ public class ApplicationAction implements Serializable { } // iterate each stream descriptor and create alert datasource for each for (StreamDesc streamDesc : metadata.getStreams()) { - alertMetadataService.removeDataSource(metadata.getAppId()); + alertMetadataService.removeDataSource(streamDesc.getStreamId()); alertMetadataService.removeStream(streamDesc.getStreamId()); } }
