Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 b98bae305 -> 365237d5c
MLHR-1934 #comment reset offset to ealiest/latest if current offset is out of range Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/365237d5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/365237d5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/365237d5 Branch: refs/heads/devel-3 Commit: 365237d5c5c8a4af36a26df997b8ab710d794b5f Parents: b98bae3 Author: Siyuan Hua <[email protected]> Authored: Thu Dec 10 00:56:36 2015 -0800 Committer: Thomas Weise <[email protected]> Committed: Thu Dec 10 13:11:09 2015 -0800 ---------------------------------------------------------------------- .../contrib/kafka/SimpleKafkaConsumer.java | 14 ++++++- .../contrib/kafka/OffsetManagerTest.java | 41 ++++++++++++++++---- 2 files changed, 45 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/365237d5/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java index 84d7a10..58ef95f 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java @@ -159,10 +159,20 @@ public class SimpleKafkaConsumer extends KafkaConsumer FetchResponse fetchResponse = ksc.fetch(req); for (Iterator<KafkaPartition> iterator = kpS.iterator(); iterator.hasNext();) { KafkaPartition kafkaPartition = iterator.next(); - if (fetchResponse.hasError() && fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId()) != ErrorMapping.NoError()) { + short errorCode = fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId()); + if (fetchResponse.hasError() && errorCode != ErrorMapping.NoError()) { // Kick off partition(s) which has error when fetch from this broker temporarily // Monitor will find out which broker it goes in monitor thread - logger.warn("Error when consuming topic {} from broker {} with error code {} ", kafkaPartition, broker, fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId())); + logger.warn("Error when consuming topic {} from broker {} with error {} ", kafkaPartition, broker, + ErrorMapping.exceptionFor(errorCode)); + if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) { + long seekTo = consumer.initialOffset.toLowerCase().equals("earliest") ? OffsetRequest.EarliestTime() + : OffsetRequest.LatestTime(); + seekTo = KafkaMetadataUtil.getLastOffset(ksc, consumer.topic, kafkaPartition.getPartitionId(), seekTo, clientName); + logger.warn("Offset out of range error, reset offset to {}", seekTo); + consumer.offsetTrack.put(kafkaPartition, seekTo); + continue; + } iterator.remove(); consumer.partitionToBroker.remove(kafkaPartition); consumer.stats.updatePartitionStats(kafkaPartition, -1, ""); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/365237d5/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java index 1374a1e..7b36ea8 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java @@ -63,6 +63,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase static final int totalCount = 100; static CountDownLatch latch; static final String OFFSET_FILE = ".offset"; + static long initialPos = 10l; public static class TestOffsetManager implements OffsetManager{ @@ -85,8 +86,8 @@ public class OffsetManagerTest extends KafkaOperatorTestBase { KafkaPartition kp0 = new KafkaPartition(TEST_TOPIC, 0); KafkaPartition kp1 = new KafkaPartition(TEST_TOPIC, 1); - offsets.put(kp0, 10l); - offsets.put(kp1, 10l); + offsets.put(kp0, initialPos); + offsets.put(kp1, initialPos); return offsets; } @@ -120,7 +121,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase for (long entry : offsets.values()) { count += entry; } - if (count == totalCount + 2) { + if (count == totalCount) { // wait until all offsets add up to totalCount messages + 2 control END_TUPLE latch.countDown(); } @@ -188,10 +189,34 @@ public class OffsetManagerTest extends KafkaOperatorTestBase @Test public void testSimpleConsumerUpdateOffsets() throws Exception { + initialPos = 10l; // Create template simple consumer try{ SimpleKafkaConsumer consumer = new SimpleKafkaConsumer(); - testPartitionableInputOperator(consumer); + testPartitionableInputOperator(consumer, totalCount - (int)initialPos - (int)initialPos); + } finally { + // clean test offset file + cleanFile(); + } + } + + /** + * Test OffsetManager update offsets in Simple Consumer + * + * [Generate send 100 messages to Kafka] ==> [wait until the offsets has been updated to 102 or timeout after 30s which means offset has not been updated] + * + * Initial offsets are invalid, reset to ealiest and get all messages + * + * @throws Exception + */ + @Test + public void testSimpleConsumerInvalidInitialOffsets() throws Exception + { + initialPos = 1000l; + // Create template simple consumer + try{ + SimpleKafkaConsumer consumer = new SimpleKafkaConsumer(); + testPartitionableInputOperator(consumer, totalCount); } finally { // clean test offset file cleanFile(); @@ -207,7 +232,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase } } - public void testPartitionableInputOperator(KafkaConsumer consumer) throws Exception{ + public void testPartitionableInputOperator(KafkaConsumer consumer, int expectedCount) throws Exception{ // Set to 3 because we want to make sure END_TUPLE from both 2 partitions are received and offsets has been updated to 102 latch = new CountDownLatch(3); @@ -241,7 +266,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase consumer.setTopic(TEST_TOPIC); //set the zookeeper list used to initialize the partition SetMultimap<String, String> zookeeper = HashMultimap.create(); - String zks = KafkaPartition.DEFAULT_CLUSTERID + "::localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]; + String zks = "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]; consumer.setZookeeper(zks); consumer.setInitialOffset("earliest"); @@ -260,11 +285,11 @@ public class OffsetManagerTest extends KafkaOperatorTestBase lc.runAsync(); // Wait 30s for consumer finish consuming all the messages and offsets has been updated to 100 - assertTrue("TIMEOUT: 30s ", latch.await(30000, TimeUnit.MILLISECONDS)); + assertTrue("TIMEOUT: 30s, collected " + collectedTuples + " tuples", latch.await(30000, TimeUnit.MILLISECONDS)); // Check results - assertEquals("Tuple count", totalCount -10 -10, collectedTuples.size()); + assertEquals("Tuple count", expectedCount, collectedTuples.size()); logger.debug(String.format("Number of emitted tuples: %d", collectedTuples.size())); p.close();
