Repository: incubator-apex-malhar Updated Branches: refs/heads/release-3.3 dc0e1a038 -> b00ff80f4
Fixes for the following issues Committed offsets are not present in offset manager storage. Operator partitions are reporting offsets to stats listener for kafka partitions they don't subscribe to. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b00ff80f Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b00ff80f Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b00ff80f Branch: refs/heads/release-3.3 Commit: b00ff80f4aa47010306513da9e0b2d6b73a2a494 Parents: dc0e1a0 Author: Pramod Immaneni <[email protected]> Authored: Tue Feb 9 20:52:56 2016 -0800 Committer: Pramod Immaneni <[email protected]> Committed: Tue Feb 9 20:52:56 2016 -0800 ---------------------------------------------------------------------- .../kafka/AbstractKafkaInputOperator.java | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b00ff80f/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java index c7bac18..671a76f 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java @@ -651,7 +651,9 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem if (p.getPartitionedInstance().getConsumer() instanceof SimpleKafkaConsumer) { p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, initOffsets); if (initOffsets != null) { - p.getPartitionedInstance().offsetStats.putAll(initOffsets); + //Don't send all offsets to all partitions + //p.getPartitionedInstance().offsetStats.putAll(initOffsets); + p.getPartitionedInstance().offsetStats.putAll(p.getPartitionedInstance().getConsumer().getCurrentOffsets()); } } newManagers.add(p.getPartitionedInstance().idempotentStorageManager); @@ -723,7 +725,11 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem { //In every partition check interval, call offsetmanager to update the offsets if (offsetManager != null) { - offsetManager.updateOffsets(getOffsetsForPartitions(kstats)); + Map<KafkaPartition, Long> offsetsForPartitions = getOffsetsForPartitions(kstats); + if (offsetsForPartitions.size() > 0) { + logger.debug("Passing offset updates to offset manager"); + offsetManager.updateOffsets(offsetsForPartitions); + } } } @@ -751,15 +757,18 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem long t = System.currentTimeMillis(); + // If stats are available then update offsets + // Do this before re-partition interval check below to not miss offset updates + if (kstats.size() > 0) { + logger.debug("Checking offset updates for offset manager"); + updateOffsets(kstats); + } + if (t - lastCheckTime < repartitionCheckInterval) { // return false if it's within repartitionCheckInterval since last time it check the stats return false; } - logger.debug("Use OffsetManager to update offsets"); - updateOffsets(kstats); - - if(repartitionInterval < 0){ // if repartition is disabled return false;
