Repository: incubator-eagle
Updated Branches:
  refs/heads/master 98dff2480 -> dcb2b5de1


[Fix]: Add simple support for given multi scheme instead of always wrap into 
schemeasmultischeme

Author: ralphsu


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/dcb2b5de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/dcb2b5de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/dcb2b5de

Branch: refs/heads/master
Commit: dcb2b5de1e82d27293405bb03969715f357d5521
Parents: 98dff24
Author: Ralph, Su <suliang...@gmail.com>
Authored: Mon Sep 19 12:23:27 2016 -0700
Committer: Ralph, Su <suliang...@gmail.com>
Committed: Mon Sep 19 12:23:27 2016 -0700

----------------------------------------------------------------------
 .../alert/engine/spout/CorrelationSpout.java    | 29 ++++++++++++++------
 1 file changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcb2b5de/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
index 9c04fa4..25c0607 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
@@ -18,6 +18,16 @@
  */
 package org.apache.eagle.alert.engine.spout;
 
+import backtype.storm.spout.MultiScheme;
+import backtype.storm.spout.Scheme;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.SpoutSpec;
 import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
@@ -29,14 +39,6 @@ import 
org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider
 import org.apache.eagle.alert.engine.serialization.Serializers;
 import org.apache.eagle.alert.utils.AlertConstants;
 import org.apache.eagle.alert.utils.StreamIdConversion;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
-import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.kafka.*;
@@ -331,7 +333,7 @@ public class CorrelationSpout extends BaseRichSpout 
implements SpoutSpecListener
             spoutConfig.startOffsetTime = 
config.getInt("spout.stormKafkaStartOffsetTime");
         }
 
-        spoutConfig.scheme = new 
SchemeAsMultiScheme(SchemeBuilder.buildFromClsName(schemeClsName, topic, conf));
+        spoutConfig.scheme = createMultiScheme(conf, topic, schemeClsName);
         KafkaSpoutWrapper wrapper = new KafkaSpoutWrapper(spoutConfig, 
kafkaSpoutMetric);
         SpoutOutputCollectorWrapper collectorWrapper = new 
SpoutOutputCollectorWrapper(this, collector, topic, spoutSpec, 
numOfRouterBolts, sds, this.serializer);
         wrapper.open(conf, context, collectorWrapper);
@@ -342,6 +344,15 @@ public class CorrelationSpout extends BaseRichSpout 
implements SpoutSpecListener
         return wrapper;
     }
 
+    private MultiScheme createMultiScheme(Map conf, String topic, String 
schemeClsName) throws Exception {
+        Scheme scheme = SchemeBuilder.buildFromClsName(schemeClsName, topic, 
conf);
+        if (scheme instanceof MultiScheme) {
+            return (MultiScheme) scheme;
+        } else {
+            return new SchemeAsMultiScheme(scheme);
+        }
+    }
+
     @Override
     public StreamDefinition getStreamDefinition(String streamId) {
         return sds.get(streamId);

Reply via email to