This is an automated email from the ASF dual-hosted git repository.

sandesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git


The following commit(s) were added to refs/heads/master by this push:
     new 52adf3e  APEXMALHAR-2518 Terminating operator execution when there is 
an error in commit offset processing (#644)
52adf3e is described below

commit 52adf3e97c459de39683b85ff96ebb362b9d4ca7
Author: Pramod Immaneni <pra...@datatorrent.com>
AuthorDate: Fri Jul 14 16:58:32 2017 -0700

    APEXMALHAR-2518 Terminating operator execution when there is an error in 
commit offset processing (#644)
---
 .../apex/malhar/kafka/AbstractKafkaInputOperator.java   | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)

diff --git 
a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index 40747eb..7f287bc 100644
--- 
a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -27,13 +27,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
@@ -48,8 +46,8 @@ import org.apache.log4j.LogManager;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
-
 import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.InputOperator;
@@ -186,6 +184,8 @@ public abstract class AbstractKafkaInputOperator implements 
InputOperator,
 
   private WindowDataManager windowDataManager = new 
WindowDataManager.NoopWindowDataManager();
 
+  private transient volatile Throwable consumerError;
+
   /**
    * Creates the Wrapper consumer object
    * It maintains consumer thread and store messages in a queue
@@ -268,6 +268,7 @@ public abstract class AbstractKafkaInputOperator implements 
InputOperator,
       }
     }
     emitCount += count;
+    processConsumerError();
   }
 
   protected abstract void emitTuple(String cluster, ConsumerRecord<byte[], 
byte[]> message);
@@ -337,6 +338,14 @@ public abstract class AbstractKafkaInputOperator 
implements InputOperator,
     windowDataManager.teardown();
   }
 
+  protected void processConsumerError()
+  {
+    if (consumerError != null) {
+      logger.error("Error in consumer, terminating");
+      throw Throwables.propagate(consumerError);
+    }
+  }
+
   private void initPartitioner()
   {
     if (partitioner == null) {
@@ -403,12 +412,14 @@ public abstract class AbstractKafkaInputOperator 
implements InputOperator,
    * @param map
    * @param e
    */
+  @Override
   public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception 
e)
   {
     if (logger.isDebugEnabled()) {
       logger.debug("Commit offsets complete {} ", 
Joiner.on(';').withKeyValueSeparator("=").join(map));
     }
     if (e != null) {
+      consumerError = e;
       logger.warn("Exceptions in committing offsets {} : {} ",
           Joiner.on(';').withKeyValueSeparator("=").join(map), e);
     }

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <commits@apex.apache.org>'].

Reply via email to