Repository: flume
Updated Branches:
  refs/heads/trunk 1c8c5e671 -> dffe1dcbc


FLUME-2799 Kafka Source - Add message offset to headers

It seems when solving https://issues.apache.org/jira/browse/FLUME-2799 ,
an oversight resulted in the message offset not being added to the header.

This change corrects this.

This closes #238

Reviewers: Ferenc Szabo, Peter Turcsanyi

(Jehan Bruggeman via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/dffe1dcb
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/dffe1dcb
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/dffe1dcb

Branch: refs/heads/trunk
Commit: dffe1dcbc79987099120108ffc0bb10a17a25ea2
Parents: 1c8c5e6
Author: Ferenc Szabo <[email protected]>
Authored: Thu Nov 22 15:45:52 2018 +0100
Committer: Ferenc Szabo <[email protected]>
Committed: Thu Nov 22 15:45:52 2018 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/flume/source/kafka/KafkaSource.java | 4 ++++
 .../java/org/apache/flume/source/kafka/KafkaSourceConstants.java | 1 +
 2 files changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/dffe1dcb/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index ddffa87..20f7c7d 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -269,6 +269,10 @@ public class KafkaSource extends AbstractPollableSource
           headers.put(KafkaSourceConstants.PARTITION_HEADER,
               String.valueOf(message.partition()));
         }
+        if (!headers.containsKey(OFFSET_HEADER)) {
+          headers.put(OFFSET_HEADER,
+              String.valueOf(message.offset()));
+        }
 
         if (kafkaKey != null) {
           headers.put(KafkaSourceConstants.KEY_HEADER, kafkaKey);

http://git-wip-us.apache.org/repos/asf/flume/blob/dffe1dcb/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index 474a143..0e15e73 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -53,6 +53,7 @@ public class KafkaSourceConstants {
   public static final String KEY_HEADER = "key";
   public static final String TIMESTAMP_HEADER = "timestamp";
   public static final String PARTITION_HEADER = "partition";
+  public static final String OFFSET_HEADER = "offset";
 
   public static final String SET_TOPIC_HEADER = "setTopicHeader";
   public static final boolean DEFAULT_SET_TOPIC_HEADER = true;

Reply via email to