APEXMALHAR-1973 #comment disable committing offsets for latest|earliest and store the offset for next message
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ca1c7e60 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ca1c7e60 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ca1c7e60 Branch: refs/heads/release-3.3 Commit: ca1c7e60f91779d7c19ecc70baa997d7ae2a2c47 Parents: be71996 Author: Siyuan Hua <[email protected]> Authored: Wed Jan 13 17:07:18 2016 -0800 Committer: Thomas Weise <[email protected]> Committed: Sat Feb 20 00:17:42 2016 -0800 ---------------------------------------------------------------------- .../apache/apex/malhar/kafka/AbstractKafkaInputOperator.java | 4 +++- .../apache/apex/malhar/kafka/AbstractKafkaPartitioner.java | 7 ++++--- 2 files changed, 7 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca1c7e60/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java index fbff2e7..512f058 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java @@ -179,6 +179,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera @Override public void committed(long windowId) { + if (initialOffset == InitialOffset.LATEST || initialOffset == InitialOffset.EARLIEST) + return; //ask kafka consumer wrapper to store the committed offsets for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter = offsetHistory.iterator(); iter.hasNext(); ) { Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = iter.next(); @@ -204,7 +206,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera emitTuple(tuple.getLeft(), msg); AbstractKafkaPartitioner.PartitionMeta pm = new AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(), msg.topic(), msg.partition()); - offsetTrack.put(pm, msg.offset()); + offsetTrack.put(pm, msg.offset() + 1); } emitCount += count; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca1c7e60/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java index c708145..53bbd2a 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java @@ -258,14 +258,15 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa return false; } PartitionMeta that = (PartitionMeta)o; - return Objects.equals(cluster, that.cluster) && - Objects.equals(getTopicPartition(), that.getTopicPartition()); + return Objects.equals(partitionId, that.partitionId) && + Objects.equals(cluster, that.cluster) && + Objects.equals(topic, that.topic); } @Override public int hashCode() { - return Objects.hash(cluster, getTopicPartition()); + return Objects.hash(cluster, topic, partitionId); } @Override
