Fixes for the following issues

Committed offsets are not present in offset manager storage.
Operator partitions are reporting offsets to stats listener for kafka 
partitions they don't subscribe to.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5775a539
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5775a539
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5775a539

Branch: refs/heads/devel-3
Commit: 5775a53904399b561335182bc822ecbc6f45f787
Parents: cec33da
Author: Pramod Immaneni <[email protected]>
Authored: Tue Feb 9 20:52:56 2016 -0800
Committer: Thomas Weise <[email protected]>
Committed: Fri Feb 12 11:58:38 2016 -0800

----------------------------------------------------------------------
 .../kafka/AbstractKafkaInputOperator.java       | 21 ++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5775a539/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index 4b22e5e..b166b9e 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -643,7 +643,9 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
     if (p.getPartitionedInstance().getConsumer() instanceof 
SimpleKafkaConsumer) {
       p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, 
initOffsets);
       if (initOffsets != null) {
-        p.getPartitionedInstance().offsetStats.putAll(initOffsets);
+        //Don't send all offsets to all partitions
+        //p.getPartitionedInstance().offsetStats.putAll(initOffsets);
+        
p.getPartitionedInstance().offsetStats.putAll(p.getPartitionedInstance().getConsumer().getCurrentOffsets());
       }
     }
     newManagers.add(p.getPartitionedInstance().idempotentStorageManager);
@@ -715,7 +717,11 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
   {
     //In every partition check interval, call offsetmanager to update the 
offsets
     if (offsetManager != null) {
-      offsetManager.updateOffsets(getOffsetsForPartitions(kstats));
+      Map<KafkaPartition, Long> offsetsForPartitions = 
getOffsetsForPartitions(kstats);
+      if (offsetsForPartitions.size() > 0) {
+        logger.debug("Passing offset updates to offset manager");
+        offsetManager.updateOffsets(offsetsForPartitions);
+      }
     }
   }
 
@@ -743,15 +749,18 @@ public abstract class AbstractKafkaInputOperator<K 
extends KafkaConsumer> implem
 
     long t = System.currentTimeMillis();
 
+    // If stats are available then update offsets
+    // Do this before re-partition interval check below to not miss offset 
updates
+    if (kstats.size() > 0) {
+      logger.debug("Checking offset updates for offset manager");
+      updateOffsets(kstats);
+    }
+
     if (t - lastCheckTime < repartitionCheckInterval) {
       // return false if it's within repartitionCheckInterval since last time 
it check the stats
       return false;
     }
 
-    logger.debug("Use OffsetManager to update offsets");
-    updateOffsets(kstats);
-
-
     if(repartitionInterval < 0){
       // if repartition is disabled
       return false;

Reply via email to