[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16594037#comment-16594037
 ] 

ASF GitHub Bot commented on FLINK-8354:
---------------------------------------

alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213060361
 
 

 ##########
 File path: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
 ##########
 @@ -350,4 +363,201 @@ public boolean isEndOfStream(Long nextElement) {
                }
        }
 
+       /**
+        * Kafka 0.11 specific test, ensuring Kafka Headers are properly 
written to and read from Kafka.
+        */
+       @Test(timeout = 60000)
+       public void testHeaders() throws Exception {
+               final String topic = "headers-topic";
+               final long testSequenceLength = 127L;
+               createTestTopic(topic, 3, 1);
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+               env.getConfig().disableSysoutLogging();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               DataStream<Long> testSequence = env.addSource(new 
SourceFunction<Long>() {
+                       private static final long serialVersionUID = 1L;
+                       boolean running = true;
+
+                       @Override
+                       public void run(SourceContext<Long> ctx) throws 
Exception {
+                               long i = 0;
+                               while (running) {
+                                       ctx.collectWithTimestamp(i, i * 2);
+                                       if (i++ == testSequenceLength) {
+                                               running = false;
+                                       }
+                               }
+                       }
+
+                       @Override
+                       public void cancel() {
+                               running = false;
+                       }
+               });
+
+               FlinkKafkaProducer011<Long> producer = new 
FlinkKafkaProducer011<>(topic,
+                       new TestHeadersKeyedSerializationSchema(topic), 
standardProps, Optional.empty());
+               testSequence.addSink(producer).setParallelism(3);
+               env.execute("Produce some data");
+
+               // Now let's consume data and check that headers deserialized 
correctly
+               env = StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+               env.getConfig().disableSysoutLogging();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               FlinkKafkaConsumer011<TestHeadersElement> kafkaSource = new 
FlinkKafkaConsumer011<>(topic, new 
TestHeadersKeyedDeserializationSchema(testSequenceLength), standardProps);
+
+               env.addSource(kafkaSource).addSink(new 
TestHeadersElementValid());
+               env.execute("Consume again");
+
+               deleteTestTopic(topic);
+       }
+
+       /**
+        * Element consisting of key, value and headers represented as list of 
tuples: key, list of Bytes.
+        */
+       public static class TestHeadersElement extends Tuple3<Long, Byte, 
List<Tuple2<String, List<Byte>>>> {
+
+       }
+
+       /**
+        * Generate "headers" for given element.
+        * @param element - sequence element
+        * @return headers
+        */
+       private static Iterable<Map.Entry<String, byte[]>> headersFor(Long 
element) {
+               final long x = element;
+               return Arrays.asList(
+                       new AbstractMap.SimpleImmutableEntry<>("low", new 
byte[]{
+                               (byte) ((x >>> 8) & 0xFF),
+                               (byte) ((x) & 0xFF)
+                       }),
+                       new AbstractMap.SimpleImmutableEntry<>("low", new 
byte[]{
+                               (byte) ((x >>> 24) & 0xFF),
+                               (byte) ((x >>> 16) & 0xFF)
+                       }),
+                       new AbstractMap.SimpleImmutableEntry<>("high", new 
byte[]{
+                               (byte) ((x >>> 40) & 0xFF),
+                               (byte) ((x >>> 32) & 0xFF)
+                       }),
+                       new AbstractMap.SimpleImmutableEntry<>("high", new 
byte[]{
+                               (byte) ((x >>> 56) & 0xFF),
+                               (byte) ((x >>> 48) & 0xFF)
+                       })
+               );
+       }
+
+       /**
+        * Convert headers into list of tuples representation. List of tuples 
is more convenient to use in
+        * assert expressions, because they have equals
+        * @param headers - headers
+        * @return list of tuples(string, list of Bytes)
+        */
+       private static List<Tuple2<String, List<Byte>>> 
headersAsList(Iterable<Map.Entry<String, byte[]>> headers) {
+               List<Tuple2<String, List<Byte>>> r = new ArrayList<>();
+               for (Map.Entry<String, byte[]> entry: headers) {
+                       final Tuple2<String, List<Byte>> t = new Tuple2<>();
+                       t.f0 = entry.getKey();
+                       t.f1 = new ArrayList<>(entry.getValue().length);
+                       for (byte b: entry.getValue()) {
+                               t.f1.add(b);
+                       }
+                       r.add(t);
+               }
+               return r;
+       }
+
+       /**
+        * Sink consuming TestHeadersElement, while consuming sink generates 
headers using
+        * message value and validates that headers generated from message
+        * are equal to headers in element, which were read from Kafka.
+        */
+       private static class TestHeadersElementValid implements 
SinkFunction<TestHeadersElement> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public void invoke(TestHeadersElement value, Context context) 
throws Exception {
+                       // calculate Headers from message
+                       final Iterable<Map.Entry<String, byte[]>> headers = 
headersFor(value.f0);
+                       final List<Tuple2<String, List<Byte>>> expected = 
headersAsList(headers);
+                       assertEquals(expected, value.f2);
+               }
+       }
+
+       /**
+        * Serialization schema, which serialize given element as value, lowest 
element byte as key,
+        * low 32-bit integer is also stored as two "low" headers with 16-bit 
parts as headers values,
+        * and similar high 32-bit integer is stored as two "high" headers, 
each 16-bit part is "high" header value.
+        */
+       private static class TestHeadersKeyedSerializationSchema implements 
KeyedSerializationSchema<Long> {
+               private final String topic;
+
+               TestHeadersKeyedSerializationSchema(String topic) {
+                       this.topic = Objects.requireNonNull(topic);
+               }
+
+               @Override
+               public byte[] serializeKey(Long element) {
+                       return new byte[] { element.byteValue() };
+               }
+
+               @Override
+               public byte[] serializeValue(Long data) {
+                       return data == null ? null : Longs.toByteArray(data);
+               }
+
+               @Override
+               public String getTargetTopic(Long element) {
+                       return topic;
+               }
+
+               @Override
+               public Iterable<Map.Entry<String, byte[]>> headers(Long 
element) {
+                       return headersFor(element);
+               }
+       }
+
+       /**
+        * Deserialization schema for TestHeadersElement elements.
+        */
+       private static class TestHeadersKeyedDeserializationSchema implements 
KeyedDeserializationSchema<TestHeadersElement> {
+               private final long count;
+
+               TestHeadersKeyedDeserializationSchema(long count){
+                       this.count = count;
+               }
+
+               @Override
+               public TypeInformation<TestHeadersElement> getProducedType() {
+                       return TypeInformation.of(TestHeadersElement.class);
+               }
+
+               @Override
+               public TestHeadersElement deserialize(byte[] messageKey, byte[] 
message, String topic, int partition, long offset) throws IOException {
+                       final TestHeadersElement element = new 
TestHeadersElement();
+                       element.f0 = Longs.fromByteArray(message);
+                       element.f1 = messageKey[0];
+                       element.f2 = new ArrayList<>(0);
+                       return element;
+               }
+
+               @Override
+               public boolean isEndOfStream(TestHeadersElement nextElement) {
+                       return nextElement.f0 >= count;
+               }
+
+               @Override
+               public TestHeadersElement deserialize(byte[] messageKey, byte[] 
message, String topic, int partition, long offset, Iterable<Map.Entry<String, 
byte[]>> headers) throws IOException {
 
 Review comment:
   Thank you. Will do

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -----------------------------------------------------
>
>                 Key: FLINK-8354
>                 URL: https://issues.apache.org/jira/browse/FLINK-8354
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>         Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>            Reporter: Mohammad Abareghi
>            Assignee: Aegeaner
>            Priority: Major
>              Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to