Repository: apex-malhar Updated Branches: refs/heads/master aaa4464f0 -> 3cb30acda
APEXMALHAR-2158 Fixed the duplication of messages emitted issue when the Kafka Input operator redeployed Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/3cb30acd Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/3cb30acd Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/3cb30acd Branch: refs/heads/master Commit: 3cb30acda78c1993acb4458414d71013e8a097c9 Parents: aaa4464 Author: Chaitanya <[email protected]> Authored: Fri Jul 22 15:43:41 2016 +0530 Committer: Chaitanya <[email protected]> Committed: Fri Jul 22 15:43:41 2016 +0530 ---------------------------------------------------------------------- .../kafka/AbstractKafkaInputOperator.java | 7 +- .../contrib/kafka/KafkaInputOperatorTest.java | 69 ++++++++++++++++++-- 2 files changed, 69 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3cb30acd/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java index 9a5917b..d4945ec 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java @@ -250,7 +250,12 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem consumer.create(); // reset the offsets to checkpointed one if (consumer instanceof SimpleKafkaConsumer && !offsetStats.isEmpty()) { - ((SimpleKafkaConsumer)consumer).resetOffset(offsetStats); + Map<KafkaPartition, Long> currentOffsets = new HashMap<>(); + // Increment the offsets and set it to consumer + for (Map.Entry<KafkaPartition, Long> e: offsetStats.entrySet()) { + currentOffsets.put(e.getKey(), e.getValue() + 1); + } + ((SimpleKafkaConsumer)consumer).resetOffset(currentOffsets); } this.context = context; operatorId = context.getId(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3cb30acd/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java index 27235f5..e4a4dec 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Collection; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -289,7 +290,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase new Thread(p).start(); - KafkaSinglePortStringInputOperator operator = createAndDeployOperator(); + KafkaSinglePortStringInputOperator operator = createAndDeployOperator(true); latch.await(4000, TimeUnit.MILLISECONDS); operator.beginWindow(1); operator.emitTuples(); @@ -303,7 +304,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase operator.teardown(); operator.deactivate(); - operator = createAndDeployOperator(); + operator = createAndDeployOperator(true); Assert.assertEquals("largest recovery window", 2, operator.getWindowDataManager().getLargestRecoveryWindow()); operator.beginWindow(1); @@ -324,9 +325,57 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase operator.deactivate(); } - private KafkaSinglePortStringInputOperator createAndDeployOperator() + @Test + public void testRecoveryAndExactlyOnce() throws Exception { + int totalCount = 1500; + + // initial the latch for this test + latch = new CountDownLatch(50); + // Start producer + KafkaTestProducer p = new KafkaTestProducer(TEST_TOPIC); + p.setSendCount(totalCount); + new Thread(p).start(); + + KafkaSinglePortStringInputOperator operator = createAndDeployOperator(false); + latch.await(4000, TimeUnit.MILLISECONDS); + operator.beginWindow(1); + operator.emitTuples(); + operator.endWindow(); + operator.beginWindow(2); + operator.emitTuples(); + operator.endWindow(); + operator.checkpointed(2); + operator.committed(2); + Map<KafkaPartition, Long> offsetStats = operator.offsetStats; + int collectedTuplesAfterCheckpoint = testMeta.sink.collectedTuples.size(); + //failure and then re-deployment of operator + testMeta.sink.collectedTuples.clear(); + operator.teardown(); + operator.deactivate(); + operator = createOperator(false); + operator.offsetStats = offsetStats; + operator.setup(testMeta.context); + operator.activate(testMeta.context); + latch.await(4000, TimeUnit.MILLISECONDS); + // Emiting data after all recovery windows are replayed + operator.beginWindow(3); + operator.emitTuples(); + operator.endWindow(); + operator.beginWindow(4); + operator.emitTuples(); + operator.endWindow(); + latch.await(3000, TimeUnit.MILLISECONDS); + + Assert.assertEquals("Total messages collected ", totalCount - collectedTuplesAfterCheckpoint + 1, testMeta.sink.collectedTuples.size()); + testMeta.sink.collectedTuples.clear(); + operator.teardown(); + operator.deactivate(); + } + + private KafkaSinglePortStringInputOperator createOperator(boolean isIdempotency) + { Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500); attributeMap.put(Context.DAGContext.APPLICATION_PATH, testMeta.baseDir); @@ -339,9 +388,12 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase consumer.setTopic(TEST_TOPIC); consumer.setInitialOffset("earliest"); - FSWindowDataManager storageManager = new FSWindowDataManager(); - storageManager.setRecoveryPath(testMeta.recoveryDir); - testMeta.operator.setWindowDataManager(storageManager); + if (isIdempotency) { + FSWindowDataManager storageManager = new FSWindowDataManager(); + storageManager.setRecoveryPath(testMeta.recoveryDir); + testMeta.operator.setWindowDataManager(storageManager); + } + testMeta.operator.setConsumer(consumer); testMeta.operator.setZookeeper("localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]); testMeta.operator.setMaxTuplesPerWindow(500); @@ -356,7 +408,12 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase testMeta.sink = new CollectorTestSink<Object>(); testMeta.operator.outputPort.setSink(testMeta.sink); operator.outputPort.setSink(testMeta.sink); + return operator; + } + private KafkaSinglePortStringInputOperator createAndDeployOperator(boolean isIdempotency) + { + KafkaSinglePortStringInputOperator operator = createOperator(isIdempotency); operator.setup(testMeta.context); operator.activate(testMeta.context);
