Fixes for the following issues Committed offsets are not present in offset manager storage. Operator partitions are reporting offsets to stats listener for kafka partitions they don't subscribe to.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5775a539 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5775a539 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5775a539 Branch: refs/heads/devel-3 Commit: 5775a53904399b561335182bc822ecbc6f45f787 Parents: cec33da Author: Pramod Immaneni <[email protected]> Authored: Tue Feb 9 20:52:56 2016 -0800 Committer: Thomas Weise <[email protected]> Committed: Fri Feb 12 11:58:38 2016 -0800 ---------------------------------------------------------------------- .../kafka/AbstractKafkaInputOperator.java | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5775a539/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java index 4b22e5e..b166b9e 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java @@ -643,7 +643,9 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem if (p.getPartitionedInstance().getConsumer() instanceof SimpleKafkaConsumer) { p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, initOffsets); if (initOffsets != null) { - p.getPartitionedInstance().offsetStats.putAll(initOffsets); + //Don't send all offsets to all partitions + //p.getPartitionedInstance().offsetStats.putAll(initOffsets); + p.getPartitionedInstance().offsetStats.putAll(p.getPartitionedInstance().getConsumer().getCurrentOffsets()); } } newManagers.add(p.getPartitionedInstance().idempotentStorageManager); @@ -715,7 +717,11 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem { //In every partition check interval, call offsetmanager to update the offsets if (offsetManager != null) { - offsetManager.updateOffsets(getOffsetsForPartitions(kstats)); + Map<KafkaPartition, Long> offsetsForPartitions = getOffsetsForPartitions(kstats); + if (offsetsForPartitions.size() > 0) { + logger.debug("Passing offset updates to offset manager"); + offsetManager.updateOffsets(offsetsForPartitions); + } } } @@ -743,15 +749,18 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem long t = System.currentTimeMillis(); + // If stats are available then update offsets + // Do this before re-partition interval check below to not miss offset updates + if (kstats.size() > 0) { + logger.debug("Checking offset updates for offset manager"); + updateOffsets(kstats); + } + if (t - lastCheckTime < repartitionCheckInterval) { // return false if it's within repartitionCheckInterval since last time it check the stats return false; } - logger.debug("Use OffsetManager to update offsets"); - updateOffsets(kstats); - - if(repartitionInterval < 0){ // if repartition is disabled return false;
