Repository: eagle Updated Branches: refs/heads/master 287a1c109 -> c75eadd44
EAGLE-943: Return topology builder to enable flink-storm topology build. Author: ralphsu This closes #855 Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/c75eadd4 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/c75eadd4 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/c75eadd4 Branch: refs/heads/master Commit: c75eadd44d097df3e6ff830ddaebadbf7751a637 Parents: 287a1c1 Author: Ralph, Su <[email protected]> Authored: Fri Mar 3 20:14:42 2017 +0800 Committer: Ralph, Su <[email protected]> Committed: Fri Mar 3 20:22:40 2017 +0800 ---------------------------------------------------------------------- .../eagle/alert/engine/UnitTopologyMain.java | 24 ++++++++++++++++---- .../alert/engine/runner/UnitTopologyRunner.java | 24 ++++++++++---------- 2 files changed, 32 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/c75eadd4/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java index 01b16b8..055032f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java @@ -19,17 +19,20 @@ package org.apache.eagle.alert.engine; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; import org.apache.eagle.alert.config.ZKConfig; import org.apache.eagle.alert.config.ZKConfigBuilder; import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService; import org.apache.eagle.alert.engine.runner.UnitTopologyRunner; + import backtype.storm.generated.StormTopology; +import backtype.storm.topology.TopologyBuilder; + import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.DefaultParser; -import org.apache.commons.cli.Options; /** @@ -84,6 +87,19 @@ public class UnitTopologyMain { String topologyId = getTopologyName(config); ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId); + return new UnitTopologyRunner(changeNotifyService).buildTopology(topologyId, config).createTopology(); + } + + /** + * Returns a builder instead of topology itself. This make it possible to run storm-flink conversion. + * + * @param config + * @return + */ + public static TopologyBuilder createTopologyBuilder(Config config) { + String topologyId = getTopologyName(config); + ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId); + return new UnitTopologyRunner(changeNotifyService).buildTopology(topologyId, config); } http://git-wip-us.apache.org/repos/asf/eagle/blob/c75eadd4/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java index 287d5db..3f06f66 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java @@ -19,12 +19,19 @@ package org.apache.eagle.alert.engine.runner; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + import org.apache.eagle.alert.coordination.model.internal.Topology; import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService; import org.apache.eagle.alert.engine.spout.CorrelationSpout; import org.apache.eagle.alert.utils.AlertConstants; import org.apache.eagle.alert.utils.StreamIdConversion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; @@ -33,17 +40,10 @@ import backtype.storm.topology.BoltDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; + import com.typesafe.config.Config; import com.typesafe.config.ConfigRenderOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - /** * By default * 1. one spout with multiple tasks @@ -106,7 +106,7 @@ public class UnitTopologyRunner { } stormConfig.setNumWorkers(numOfTotalWorkers); - StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config); + StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config).createTopology(); if (localMode) { LOG.info("Submitting as local mode"); @@ -143,7 +143,7 @@ public class UnitTopologyRunner { // Build Storm Topology // --------------------------- - public StormTopology buildTopology(String topologyId, + public TopologyBuilder buildTopology(String topologyId, int numOfSpoutTasks, int numOfRouterBolts, int numOfAlertBolts, @@ -208,10 +208,10 @@ public class UnitTopologyRunner { boltDeclarer.fieldsGrouping(alertBoltNamePrefix + i, new Fields(AlertConstants.FIELD_0)); } - return builder.createTopology(); + return builder; } - public StormTopology buildTopology(String topologyId, Config config) { + public TopologyBuilder buildTopology(String topologyId, Config config) { int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM); int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
