This is an automated email from the ASF dual-hosted git repository. sandesh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
The following commit(s) were added to refs/heads/master by this push: new 52adf3e APEXMALHAR-2518 Terminating operator execution when there is an error in commit offset processing (#644) 52adf3e is described below commit 52adf3e97c459de39683b85ff96ebb362b9d4ca7 Author: Pramod Immaneni <pra...@datatorrent.com> AuthorDate: Fri Jul 14 16:58:32 2017 -0700 APEXMALHAR-2518 Terminating operator execution when there is an error in commit offset processing (#644) --- .../apex/malhar/kafka/AbstractKafkaInputOperator.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java index 40747eb..7f287bc 100644 --- a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java +++ b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java @@ -27,13 +27,11 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; - import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -48,8 +46,8 @@ import org.apache.log4j.LogManager; import com.google.common.base.Joiner; import com.google.common.base.Splitter; +import com.google.common.base.Throwables; import com.google.common.collect.Iterables; - import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context; import com.datatorrent.api.InputOperator; @@ -186,6 +184,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, private WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager(); + private transient volatile Throwable consumerError; + /** * Creates the Wrapper consumer object * It maintains consumer thread and store messages in a queue @@ -268,6 +268,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, } } emitCount += count; + processConsumerError(); } protected abstract void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message); @@ -337,6 +338,14 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, windowDataManager.teardown(); } + protected void processConsumerError() + { + if (consumerError != null) { + logger.error("Error in consumer, terminating"); + throw Throwables.propagate(consumerError); + } + } + private void initPartitioner() { if (partitioner == null) { @@ -403,12 +412,14 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, * @param map * @param e */ + @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { if (logger.isDebugEnabled()) { logger.debug("Commit offsets complete {} ", Joiner.on(';').withKeyValueSeparator("=").join(map)); } if (e != null) { + consumerError = e; logger.warn("Exceptions in committing offsets {} : {} ", Joiner.on(';').withKeyValueSeparator("=").join(map), e); } -- To stop receiving notification emails like this one, please contact ['"commits@apex.apache.org" <commits@apex.apache.org>'].