[EAGLE-1014] add exception handling in CorrelationSpout.java

https://issues.apache.org/jira/browse/EAGLE-1014

Author: Zhao, Qingwen <[email protected]>

Closes #927 from qingwen220/EAGLE-1014.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/eaad6cf7
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/eaad6cf7
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/eaad6cf7

Branch: refs/heads/branch-0.5
Commit: eaad6cf74c896a97a061a3f600a0ec64a95c0963
Parents: 84d40ae
Author: Zhao, Qingwen <[email protected]>
Authored: Tue May 2 11:29:46 2017 -0700
Committer: Jay <[email protected]>
Committed: Tue May 2 11:29:46 2017 -0700

----------------------------------------------------------------------
 .../eagle/alert/engine/spout/CorrelationSpout.java    | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/eaad6cf7/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
index e9ee892..4338964 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
@@ -172,6 +172,7 @@ public class CorrelationSpout extends BaseRichSpout 
implements SpoutSpecListener
                 wrapper.nextTuple();
             } catch (Exception e) {
                 LOG.error("unexpected exception is caught: {}", 
e.getMessage(), e);
+                collector.reportError(e);
             }
 
         }
@@ -256,9 +257,14 @@ public class CorrelationSpout extends BaseRichSpout 
implements SpoutSpecListener
                 LOG.warn(MessageFormat.format("try to create new topic {0}, 
but found in the active spout list, this may indicate some inconsistency", 
topic));
                 continue;
             }
-            KafkaSpoutWrapper newWrapper = 
createKafkaSpout(ConfigFactory.parseMap(dataSourceProperties.get(topic)).withFallback(this.config),
-                    conf, context, collector, topic, newSchemaName.get(topic), 
newMeta, sds);
-            newKafkaSpoutList.put(topic, newWrapper);
+            try {
+                KafkaSpoutWrapper newWrapper = 
createKafkaSpout(ConfigFactory.parseMap(dataSourceProperties.get(topic)).withFallback(this.config),
+                        conf, context, collector, topic, 
newSchemaName.get(topic), newMeta, sds);
+                newKafkaSpoutList.put(topic, newWrapper);
+            } catch (Exception e) {
+                LOG.error("fail to create KafkaSpoutWrapper for topic {} due 
to {}", topic, e.getMessage(), e);
+                collector.reportError(e);
+            }
         }
         // iterate remove topics and then close KafkaSpout
         for (String topic : removeTopics) {
@@ -285,6 +291,8 @@ public class CorrelationSpout extends BaseRichSpout 
implements SpoutSpecListener
         this.cachedSpoutSpec = newMeta;
         this.kafkaSpoutList = newKafkaSpoutList;
         this.sds = sds;
+
+        LOG.info("after CorrelationSpout reloads, {} kafkaSpouts are generated 
for {} topics", kafkaSpoutList.size(), topics.size());
     }
 
     /**

Reply via email to