[ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=94250&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94250
 ]

ASF GitHub Bot logged work on BEAM-4038:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Apr/18 19:11
            Start Date: 23/Apr/18 19:11
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #5111: [BEAM-4038] Support 
Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
index 493598672d9..7ddfaa06a4d 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
@@ -42,6 +42,7 @@
  * to eliminate the method definition differences.
  */
 class ConsumerSpEL {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(ConsumerSpEL.class);
 
   private SpelParserConfiguration config = new SpelParserConfiguration(true, 
true);
@@ -55,6 +56,21 @@
 
   private boolean hasRecordTimestamp = false;
   private boolean hasOffsetsForTimes = false;
+  static boolean hasHeaders = false;
+
+  static {
+    try {
+      // It is supported by Kafka Client 0.11.0.0 onwards.
+      hasHeaders = ConsumerRecord
+          .class
+          .getMethod("headers", (Class<?>[]) null)
+          .getReturnType()
+          .getName()
+          .equals("org.apache.kafka.common.header.Headers");
+    } catch (NoSuchMethodException | SecurityException e) {
+      LOG.debug("Headers is not available");
+    }
+  }
 
   public ConsumerSpEL() {
     try {
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
index f4b1f1bb67b..25cc27bee4a 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
@@ -17,9 +17,12 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
+import com.google.common.base.Objects;
 import java.io.Serializable;
 import java.util.Arrays;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.values.KV;
+import org.apache.kafka.common.header.Headers;
 
 /**
  * KafkaRecord contains key and value of the record as well as metadata for 
the record (topic name,
@@ -33,6 +36,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
   private final KV<K, V> kv;
   private final long timestamp;
   private final KafkaTimestampType timestampType;
@@ -43,9 +47,10 @@ public KafkaRecord(
       long offset,
       long timestamp,
       KafkaTimestampType timestampType,
+      @Nullable Headers headers,
       K key,
       V value) {
-    this(topic, partition, offset, timestamp, timestampType, KV.of(key, 
value));
+    this(topic, partition, offset, timestamp, timestampType, headers, 
KV.of(key, value));
   }
 
   public KafkaRecord(
@@ -54,16 +59,17 @@ public KafkaRecord(
       long offset,
       long timestamp,
       KafkaTimestampType timestampType,
+      @Nullable Headers headers,
       KV<K, V> kv) {
     this.topic = topic;
     this.partition = partition;
     this.offset = offset;
     this.timestamp = timestamp;
     this.timestampType = timestampType;
+    this.headers = headers;
     this.kv = kv;
   }
 
-
   public String getTopic() {
     return topic;
   }
@@ -76,6 +82,14 @@ public long getOffset() {
     return offset;
   }
 
+  public Headers getHeaders() {
+    if (!ConsumerSpEL.hasHeaders){
+      throw new RuntimeException("The version kafka-clients does not support 
record headers, "
+          + "please use version 0.11.0.0 or newer");
+    }
+    return headers;
+  }
+
   public KV<K, V> getKV() {
     return kv;
   }
@@ -90,7 +104,7 @@ public KafkaTimestampType getTimestampType() {
 
   @Override
   public int hashCode() {
-    return Arrays.deepHashCode(new Object[]{topic, partition, offset, 
timestamp, kv});
+    return Arrays.deepHashCode(new Object[] {topic, partition, offset, 
timestamp, headers, kv});
   }
 
   @Override
@@ -102,6 +116,7 @@ public boolean equals(Object obj) {
           && partition == other.partition
           && offset == other.offset
           && timestamp == other.timestamp
+          && Objects.equal(headers, other.headers)
           && kv.equals(other.kv);
     } else {
       return false;
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index cb3b953d0bb..519543f3adb 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -20,14 +20,21 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.values.KV;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
 
 /**
  * {@link Coder} for {@link KafkaRecord}.
@@ -37,6 +44,8 @@
   private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
   private static final VarLongCoder longCoder = VarLongCoder.of();
   private static final VarIntCoder intCoder = VarIntCoder.of();
+  private static final IterableCoder headerCoder =
+      IterableCoder.of(KvCoder.of(stringCoder, ByteArrayCoder.of()));
 
   private final KvCoder<K, V> kvCoder;
 
@@ -55,6 +64,7 @@ public void encode(KafkaRecord<K, V> value, OutputStream 
outStream) throws IOExc
     longCoder.encode(value.getOffset(), outStream);
     longCoder.encode(value.getTimestamp(), outStream);
     intCoder.encode(value.getTimestampType().ordinal(), outStream);
+    headerCoder.encode(toIterable(value), outStream);
     kvCoder.encode(value.getKV(), outStream);
   }
 
@@ -66,9 +76,33 @@ public void encode(KafkaRecord<K, V> value, OutputStream 
outStream) throws IOExc
         longCoder.decode(inStream),
         longCoder.decode(inStream),
         KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
+        (Headers) toHeaders(headerCoder.decode(inStream)),
         kvCoder.decode(inStream));
   }
 
+  private Object toHeaders(Iterable<KV<String, byte[]>> records) {
+    if (!ConsumerSpEL.hasHeaders) {
+      return null;
+    }
+
+    // ConsumerRecord is used to simply create a list of headers
+    ConsumerRecord<String, String> consumerRecord = new ConsumerRecord<>("", 
0, 0L, "", "");
+    records.forEach(kv -> consumerRecord.headers().add(kv.getKey(), 
kv.getValue()));
+    return consumerRecord.headers();
+  }
+
+  private Iterable<KV<String, byte[]>> toIterable(KafkaRecord record) {
+    if (!ConsumerSpEL.hasHeaders){
+      return Collections.emptyList();
+    }
+
+    List<KV<String, byte[]>> vals = new ArrayList<>();
+    for (Header header : record.getHeaders()) {
+      vals.add(KV.of(header.key(), header.value()));
+    }
+    return vals;
+  }
+
   @Override
   public List<? extends Coder<?>> getCoderArguments() {
     return kvCoder.getCoderArguments();
@@ -97,6 +131,7 @@ public Object structuralValue(KafkaRecord<K, V> value) {
           value.getOffset(),
           value.getTimestamp(),
           value.getTimestampType(),
+          !ConsumerSpEL.hasHeaders ? null : value.getHeaders(),
           (KV<Object, Object>) kvCoder.structuralValue(value.getKV()));
     }
   }
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index e2c28d52e6f..682b3063781 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -164,7 +164,7 @@ public boolean start() throws IOException {
   }
 
   @Override
-  public boolean advance() {
+  public boolean advance() throws IOException {
     /* Read first record (if any). we need to loop here because :
      *  - (a) some records initially need to be skipped if they are before 
consumedOffset
      *  - (b) if curBatch is empty, we want to fetch next batch and then 
advance.
@@ -212,6 +212,7 @@ public boolean advance() {
             rawRecord.offset(),
             consumerSpEL.getRecordTimestamp(rawRecord),
             consumerSpEL.getRecordTimestampType(rawRecord),
+            ConsumerSpEL.hasHeaders ? rawRecord.headers() : null,
             keyDeserializerInstance.deserialize(rawRecord.topic(), 
rawRecord.key()),
             valueDeserializerInstance.deserialize(rawRecord.topic(), 
rawRecord.value()));
 
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
index 04e86a6a94b..d1114c21f5f 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
@@ -27,6 +27,7 @@
 import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
@@ -46,13 +47,14 @@
       .map(ts -> {
         Instant result = policy.getTimestampForRecord(
           null, new KafkaRecord<>("topic", 0, 0, now.getMillis() + ts,
-                                  KafkaTimestampType.CREATE_TIME, "key", 
"value"));
+                                      KafkaTimestampType.CREATE_TIME,
+                                      new RecordHeaders(),
+                                      "key", "value"));
         return result.getMillis() - now.getMillis();
       })
       .collect(Collectors.toList());
   }
 
-
   @Test
   public void testCustomTimestampPolicyWithLimitedDelay() {
     // Verifies that max delay is applies appropriately for reporting watermark
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java
index 426103dc886..a6344a62134 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java
@@ -17,8 +17,17 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -31,4 +40,34 @@ public void testCoderIsSerializableWithWellKnownCoderType() {
     CoderProperties.coderSerializable(
         KafkaRecordCoder.of(GlobalWindow.Coder.INSTANCE, 
GlobalWindow.Coder.INSTANCE));
   }
+
+  @Test
+  public void testKafkaRecordSerializableWithHeaders() throws IOException {
+    RecordHeaders headers = new RecordHeaders();
+    headers.add("headerKey", "headerVal".getBytes());
+    verifySerialization(headers);
+  }
+
+  @Test
+  public void testKafkaRecordSerializableWithoutHeaders() throws IOException {
+    ConsumerRecord consumerRecord = new ConsumerRecord<>("", 0, 0L, "", "");
+    verifySerialization(consumerRecord.headers());
+  }
+
+  private void verifySerialization(Headers headers) throws IOException {
+    KafkaRecord<String, String> kafkaRecord =
+        new KafkaRecord<>(
+            "topic", 0, 0, 0,
+            KafkaTimestampType.CREATE_TIME, headers, "key", "value");
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    KafkaRecordCoder kafkaRecordCoder =
+        KafkaRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
+
+    kafkaRecordCoder.encode(kafkaRecord, outputStream);
+    KafkaRecord<String, String> decodedRecord =
+        kafkaRecordCoder.decode(new 
ByteArrayInputStream(outputStream.toByteArray()));
+
+    assertEquals(kafkaRecord, decodedRecord);
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 94250)
    Time Spent: 9h 10m  (was: 9h)

> Support Kafka Headers in KafkaIO
> --------------------------------
>
>                 Key: BEAM-4038
>                 URL: https://issues.apache.org/jira/browse/BEAM-4038
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-kafka
>            Reporter: Geet Kumar
>            Assignee: Raghu Angadi
>            Priority: Minor
>          Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to