Repository: incubator-apex-malhar Updated Branches: refs/heads/master cec33da88 -> 5775a5390
Fixes for the following issues Committed offsets are not present in offset manager storage. Operator partitions are reporting offsets to stats listener for kafka partitions they don't subscribe to. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5775a539 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5775a539 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5775a539 Branch: refs/heads/master Commit: 5775a53904399b561335182bc822ecbc6f45f787 Parents: cec33da Author: Pramod Immaneni <[email protected]> Authored: Tue Feb 9 20:52:56 2016 -0800 Committer: Thomas Weise <[email protected]> Committed: Fri Feb 12 11:58:38 2016 -0800 ---------------------------------------------------------------------- .../kafka/AbstractKafkaInputOperator.java | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5775a539/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java index 4b22e5e..b166b9e 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java @@ -643,7 +643,9 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem if (p.getPartitionedInstance().getConsumer() instanceof SimpleKafkaConsumer) { p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, initOffsets); if (initOffsets != null) { - p.getPartitionedInstance().offsetStats.putAll(initOffsets); + //Don't send all offsets to all partitions + //p.getPartitionedInstance().offsetStats.putAll(initOffsets); + p.getPartitionedInstance().offsetStats.putAll(p.getPartitionedInstance().getConsumer().getCurrentOffsets()); } } newManagers.add(p.getPartitionedInstance().idempotentStorageManager); @@ -715,7 +717,11 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem { //In every partition check interval, call offsetmanager to update the offsets if (offsetManager != null) { - offsetManager.updateOffsets(getOffsetsForPartitions(kstats)); + Map<KafkaPartition, Long> offsetsForPartitions = getOffsetsForPartitions(kstats); + if (offsetsForPartitions.size() > 0) { + logger.debug("Passing offset updates to offset manager"); + offsetManager.updateOffsets(offsetsForPartitions); + } } } @@ -743,15 +749,18 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem long t = System.currentTimeMillis(); + // If stats are available then update offsets + // Do this before re-partition interval check below to not miss offset updates + if (kstats.size() > 0) { + logger.debug("Checking offset updates for offset manager"); + updateOffsets(kstats); + } + if (t - lastCheckTime < repartitionCheckInterval) { // return false if it's within repartitionCheckInterval since last time it check the stats return false; } - logger.debug("Use OffsetManager to update offsets"); - updateOffsets(kstats); - - if(repartitionInterval < 0){ // if repartition is disabled return false;
