APEXMALHAR-1973 #comment disable committing offsets for latest|earliest and store the offset for next message
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c464f064 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c464f064 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c464f064 Branch: refs/heads/master Commit: c464f064b590b786f257a765df026cb29a50c8ae Parents: b431eb3 Author: Siyuan Hua <[email protected]> Authored: Wed Jan 13 17:07:18 2016 -0800 Committer: Siyuan Hua <[email protected]> Committed: Wed Jan 13 17:12:08 2016 -0800 ---------------------------------------------------------------------- .../apache/apex/malhar/kafka/AbstractKafkaInputOperator.java | 4 +++- .../apache/apex/malhar/kafka/AbstractKafkaPartitioner.java | 7 ++++--- 2 files changed, 7 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c464f064/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java index 4f2f704..c021c1c 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java @@ -177,6 +177,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera @Override public void committed(long windowId) { + if (initialOffset == InitialOffset.LATEST || initialOffset == InitialOffset.EARLIEST) + return; //ask kafka consumer wrapper to store the committed offsets for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter = offsetHistory.iterator(); iter.hasNext(); ) { Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = iter.next(); @@ -202,7 +204,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera emitTuple(tuple.getLeft(), msg); AbstractKafkaPartitioner.PartitionMeta pm = new AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(), msg.topic(), msg.partition()); - offsetTrack.put(pm, msg.offset()); + offsetTrack.put(pm, msg.offset() + 1); } emitCount += count; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c464f064/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java index 2159e4f..57c6998 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java @@ -256,14 +256,15 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa return false; } PartitionMeta that = (PartitionMeta)o; - return Objects.equals(cluster, that.cluster) && - Objects.equals(getTopicPartition(), that.getTopicPartition()); + return Objects.equals(partitionId, that.partitionId) && + Objects.equals(cluster, that.cluster) && + Objects.equals(topic, that.topic); } @Override public int hashCode() { - return Objects.hash(cluster, getTopicPartition()); + return Objects.hash(cluster, topic, partitionId); } @Override
