Repository: incubator-eagle Updated Branches: refs/heads/master 976edcd86 -> f833e9831
[MINOR] Add makeSSS and makeSRS in RouteSpec - Add makeSSS and makeSRS in RouteSpec Author: r7raul1984 <tangji...@yhd.com> Closes #699 from r7raul1984/ROUTESPEC-MINOR. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f833e983 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f833e983 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f833e983 Branch: refs/heads/master Commit: f833e9831f473660a084327f67418197b5f00d02 Parents: 976edcd Author: r7raul1984 <tangji...@yhd.com> Authored: Tue Dec 6 10:29:05 2016 +0800 Committer: wujinhu <wujinhu...@126.com> Committed: Tue Dec 6 10:29:05 2016 +0800 ---------------------------------------------------------------------- .../alert/coordination/model/RouterSpec.java | 25 ++++++++++++++++++++ .../alert/engine/runner/StreamRouterBolt.java | 15 ++---------- 2 files changed, 27 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f833e983/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java index fc13c56..b3877c3 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java @@ -17,9 +17,13 @@ package org.apache.eagle.alert.coordination.model; import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.eagle.alert.engine.coordinator.StreamPartition; +import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class RouterSpec { @@ -66,6 +70,27 @@ public class RouterSpec { this.routerSpecs = routerSpecs; } + public Map<StreamPartition, List<StreamRouterSpec>> makeSRS() { + Map<StreamPartition, List<StreamRouterSpec>> newSRS = new HashMap<>(); + this.getRouterSpecs().forEach(t -> { + if (!newSRS.containsKey(t.getPartition())) { + newSRS.put(t.getPartition(), new ArrayList<>()); + } + newSRS.get(t.getPartition()).add(t); + }); + return newSRS; + } + + public Map<StreamPartition, StreamSortSpec> makeSSS() { + Map<StreamPartition, StreamSortSpec> newSSS = new HashMap<>(); + this.getRouterSpecs().forEach(t -> { + if (t.getPartition().getSortSpec() != null) { + newSSS.put(t.getPartition(), t.getPartition().getSortSpec()); + } + }); + return newSSS; + } + @Override public String toString() { return String.format("version:%s-topo:%s, boltSpec:%s", version, topologyName, routerSpecs); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f833e983/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java index 29ee771..e37b680 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java @@ -105,12 +105,7 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter sanityCheck(spec); // figure out added, removed, modified StreamSortSpec - Map<StreamPartition, StreamSortSpec> newSSS = new HashMap<>(); - spec.getRouterSpecs().forEach(t -> { - if (t.getPartition().getSortSpec() != null) { - newSSS.put(t.getPartition(), t.getPartition().getSortSpec()); - } - }); + Map<StreamPartition, StreamSortSpec> newSSS = spec.makeSSS(); Set<StreamPartition> newStreamIds = newSSS.keySet(); Set<StreamPartition> cachedStreamIds = cachedSSS.keySet(); @@ -138,13 +133,7 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter cachedSSS = newSSS; // figure out added, removed, modified StreamRouterSpec - Map<StreamPartition, List<StreamRouterSpec>> newSRS = new HashMap<>(); - spec.getRouterSpecs().forEach(t -> { - if (!newSRS.containsKey(t.getPartition())) { - newSRS.put(t.getPartition(), new ArrayList<StreamRouterSpec>()); - } - newSRS.get(t.getPartition()).add(t); - }); + Map<StreamPartition, List<StreamRouterSpec>> newSRS = spec.makeSRS(); Set<StreamPartition> newStreamPartitions = newSRS.keySet(); Set<StreamPartition> cachedStreamPartitions = cachedSRS.keySet();