Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/release-3.2 58bf4061e -> 1982341dc


MLHR-1934 #comment reset offset to ealiest/latest if current offset is out of 
range


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/1982341d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/1982341d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/1982341d

Branch: refs/heads/release-3.2
Commit: 1982341dc216404634a1f2c5fd2a9fdfcf7146ce
Parents: 58bf406
Author: Siyuan Hua <[email protected]>
Authored: Thu Dec 10 00:56:36 2015 -0800
Committer: Thomas Weise <[email protected]>
Committed: Thu Dec 10 13:07:04 2015 -0800

----------------------------------------------------------------------
 .../contrib/kafka/SimpleKafkaConsumer.java      | 14 ++++++-
 .../contrib/kafka/OffsetManagerTest.java        | 41 ++++++++++++++++----
 2 files changed, 45 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1982341d/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
index 84d7a10..58ef95f 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
@@ -159,10 +159,20 @@ public class SimpleKafkaConsumer extends KafkaConsumer
             FetchResponse fetchResponse = ksc.fetch(req);
             for (Iterator<KafkaPartition> iterator = kpS.iterator(); 
iterator.hasNext();) {
               KafkaPartition kafkaPartition = iterator.next();
-              if (fetchResponse.hasError() && 
fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId()) != 
ErrorMapping.NoError()) {
+              short errorCode = fetchResponse.errorCode(consumer.topic, 
kafkaPartition.getPartitionId());
+              if (fetchResponse.hasError() && errorCode != 
ErrorMapping.NoError()) {
                 // Kick off partition(s) which has error when fetch from this 
broker temporarily 
                 // Monitor will find out which broker it goes in monitor thread
-                logger.warn("Error when consuming topic {} from broker {} with 
error code {} ", kafkaPartition, broker,  
fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId()));
+                logger.warn("Error when consuming topic {} from broker {} with 
error {} ", kafkaPartition, broker,
+                  ErrorMapping.exceptionFor(errorCode));
+                if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
+                  long seekTo = 
consumer.initialOffset.toLowerCase().equals("earliest") ? 
OffsetRequest.EarliestTime()
+                    : OffsetRequest.LatestTime();
+                  seekTo = KafkaMetadataUtil.getLastOffset(ksc, 
consumer.topic, kafkaPartition.getPartitionId(), seekTo, clientName);
+                  logger.warn("Offset out of range error, reset offset to {}", 
seekTo);
+                  consumer.offsetTrack.put(kafkaPartition, seekTo);
+                  continue;
+                }
                 iterator.remove();
                 consumer.partitionToBroker.remove(kafkaPartition);
                 consumer.stats.updatePartitionStats(kafkaPartition, -1, "");

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1982341d/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java 
b/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
index 1374a1e..7b36ea8 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
@@ -63,6 +63,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase
   static final int totalCount = 100;
   static CountDownLatch latch;
   static final String OFFSET_FILE = ".offset";
+  static long initialPos = 10l;
 
 
   public static class TestOffsetManager implements OffsetManager{
@@ -85,8 +86,8 @@ public class OffsetManagerTest extends KafkaOperatorTestBase
     {
       KafkaPartition kp0 = new KafkaPartition(TEST_TOPIC, 0);
       KafkaPartition kp1 = new KafkaPartition(TEST_TOPIC, 1);
-      offsets.put(kp0, 10l);
-      offsets.put(kp1, 10l);
+      offsets.put(kp0, initialPos);
+      offsets.put(kp1, initialPos);
       return offsets;
     }
 
@@ -120,7 +121,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase
         for (long entry : offsets.values()) {
           count += entry;
         }
-        if (count == totalCount + 2) {
+        if (count == totalCount) {
           // wait until all offsets add up to totalCount messages + 2 control 
END_TUPLE
           latch.countDown();
         }
@@ -188,10 +189,34 @@ public class OffsetManagerTest extends 
KafkaOperatorTestBase
   @Test
   public void testSimpleConsumerUpdateOffsets() throws Exception
   {
+    initialPos = 10l;
     // Create template simple consumer
     try{
       SimpleKafkaConsumer consumer = new SimpleKafkaConsumer();
-      testPartitionableInputOperator(consumer);
+      testPartitionableInputOperator(consumer, totalCount - (int)initialPos - 
(int)initialPos);
+    } finally {
+      // clean test offset file
+      cleanFile();
+    }
+  }
+
+  /**
+   * Test OffsetManager update offsets in Simple Consumer
+   *
+   * [Generate send 100 messages to Kafka] ==> [wait until the offsets has 
been updated to 102 or timeout after 30s which means offset has not been 
updated]
+   *
+   * Initial offsets are invalid, reset to ealiest and get all messages
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testSimpleConsumerInvalidInitialOffsets() throws Exception
+  {
+    initialPos = 1000l;
+    // Create template simple consumer
+    try{
+      SimpleKafkaConsumer consumer = new SimpleKafkaConsumer();
+      testPartitionableInputOperator(consumer, totalCount);
     } finally {
       // clean test offset file
       cleanFile();
@@ -207,7 +232,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase
     }
   }
 
-  public void testPartitionableInputOperator(KafkaConsumer consumer) throws 
Exception{
+  public void testPartitionableInputOperator(KafkaConsumer consumer, int 
expectedCount) throws Exception{
 
     // Set to 3 because we want to make sure END_TUPLE from both 2 partitions 
are received and offsets has been updated to 102
     latch = new CountDownLatch(3);
@@ -241,7 +266,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase
     consumer.setTopic(TEST_TOPIC);
     //set the zookeeper list used to initialize the partition
     SetMultimap<String, String> zookeeper = HashMultimap.create();
-    String zks = KafkaPartition.DEFAULT_CLUSTERID + "::localhost:" + 
KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0];
+    String zks = "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0];
     consumer.setZookeeper(zks);
     consumer.setInitialOffset("earliest");
 
@@ -260,11 +285,11 @@ public class OffsetManagerTest extends 
KafkaOperatorTestBase
     lc.runAsync();
 
     // Wait 30s for consumer finish consuming all the messages and offsets has 
been updated to 100
-    assertTrue("TIMEOUT: 30s ", latch.await(30000, TimeUnit.MILLISECONDS));
+    assertTrue("TIMEOUT: 30s, collected " + collectedTuples + " tuples", 
latch.await(30000, TimeUnit.MILLISECONDS));
 
 
     // Check results
-    assertEquals("Tuple count", totalCount -10 -10, collectedTuples.size());
+    assertEquals("Tuple count", expectedCount, collectedTuples.size());
     logger.debug(String.format("Number of emitted tuples: %d", 
collectedTuples.size()));
 
     p.close();

Reply via email to