Repository: flume Updated Branches: refs/heads/trunk 1c8c5e671 -> dffe1dcbc
FLUME-2799 Kafka Source - Add message offset to headers It seems when solving https://issues.apache.org/jira/browse/FLUME-2799 , an oversight resulted in the message offset not being added to the header. This change corrects this. This closes #238 Reviewers: Ferenc Szabo, Peter Turcsanyi (Jehan Bruggeman via Ferenc Szabo) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/dffe1dcb Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/dffe1dcb Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/dffe1dcb Branch: refs/heads/trunk Commit: dffe1dcbc79987099120108ffc0bb10a17a25ea2 Parents: 1c8c5e6 Author: Ferenc Szabo <[email protected]> Authored: Thu Nov 22 15:45:52 2018 +0100 Committer: Ferenc Szabo <[email protected]> Committed: Thu Nov 22 15:45:52 2018 +0100 ---------------------------------------------------------------------- .../src/main/java/org/apache/flume/source/kafka/KafkaSource.java | 4 ++++ .../java/org/apache/flume/source/kafka/KafkaSourceConstants.java | 1 + 2 files changed, 5 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/dffe1dcb/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index ddffa87..20f7c7d 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -269,6 +269,10 @@ public class KafkaSource extends AbstractPollableSource headers.put(KafkaSourceConstants.PARTITION_HEADER, String.valueOf(message.partition())); } + if (!headers.containsKey(OFFSET_HEADER)) { + headers.put(OFFSET_HEADER, + String.valueOf(message.offset())); + } if (kafkaKey != null) { headers.put(KafkaSourceConstants.KEY_HEADER, kafkaKey); http://git-wip-us.apache.org/repos/asf/flume/blob/dffe1dcb/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java index 474a143..0e15e73 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java @@ -53,6 +53,7 @@ public class KafkaSourceConstants { public static final String KEY_HEADER = "key"; public static final String TIMESTAMP_HEADER = "timestamp"; public static final String PARTITION_HEADER = "partition"; + public static final String OFFSET_HEADER = "offset"; public static final String SET_TOPIC_HEADER = "setTopicHeader"; public static final boolean DEFAULT_SET_TOPIC_HEADER = true;
