yanghua commented on a change in pull request #6615: [FLINK-8354]
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213034555
##########
File path:
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
##########
@@ -350,4 +363,201 @@ public boolean isEndOfStream(Long nextElement) {
}
}
+ /**
+ * Kafka 0.11 specific test, ensuring Kafka Headers are properly
written to and read from Kafka.
+ */
+ @Test(timeout = 60000)
+ public void testHeaders() throws Exception {
+ final String topic = "headers-topic";
+ final long testSequenceLength = 127L;
+ createTestTopic(topic, 3, 1);
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ DataStream<Long> testSequence = env.addSource(new
SourceFunction<Long>() {
+ private static final long serialVersionUID = 1L;
+ boolean running = true;
+
+ @Override
+ public void run(SourceContext<Long> ctx) throws
Exception {
+ long i = 0;
+ while (running) {
+ ctx.collectWithTimestamp(i, i * 2);
+ if (i++ == testSequenceLength) {
+ running = false;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ FlinkKafkaProducer011<Long> producer = new
FlinkKafkaProducer011<>(topic,
+ new TestHeadersKeyedSerializationSchema(topic),
standardProps, Optional.empty());
+ testSequence.addSink(producer).setParallelism(3);
+ env.execute("Produce some data");
+
+ // Now let's consume data and check that headers deserialized
correctly
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ FlinkKafkaConsumer011<TestHeadersElement> kafkaSource = new
FlinkKafkaConsumer011<>(topic, new
TestHeadersKeyedDeserializationSchema(testSequenceLength), standardProps);
+
+ env.addSource(kafkaSource).addSink(new
TestHeadersElementValid());
+ env.execute("Consume again");
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Element consisting of key, value and headers represented as list of
tuples: key, list of Bytes.
+ */
+ public static class TestHeadersElement extends Tuple3<Long, Byte,
List<Tuple2<String, List<Byte>>>> {
+
+ }
+
+ /**
+ * Generate "headers" for given element.
+ * @param element - sequence element
+ * @return headers
+ */
+ private static Iterable<Map.Entry<String, byte[]>> headersFor(Long
element) {
+ final long x = element;
+ return Arrays.asList(
+ new AbstractMap.SimpleImmutableEntry<>("low", new
byte[]{
+ (byte) ((x >>> 8) & 0xFF),
+ (byte) ((x) & 0xFF)
+ }),
+ new AbstractMap.SimpleImmutableEntry<>("low", new
byte[]{
+ (byte) ((x >>> 24) & 0xFF),
+ (byte) ((x >>> 16) & 0xFF)
+ }),
+ new AbstractMap.SimpleImmutableEntry<>("high", new
byte[]{
+ (byte) ((x >>> 40) & 0xFF),
+ (byte) ((x >>> 32) & 0xFF)
+ }),
+ new AbstractMap.SimpleImmutableEntry<>("high", new
byte[]{
+ (byte) ((x >>> 56) & 0xFF),
+ (byte) ((x >>> 48) & 0xFF)
+ })
+ );
+ }
+
+ /**
+ * Convert headers into list of tuples representation. List of tuples
is more convenient to use in
+ * assert expressions, because they have equals
+ * @param headers - headers
+ * @return list of tuples(string, list of Bytes)
+ */
+ private static List<Tuple2<String, List<Byte>>>
headersAsList(Iterable<Map.Entry<String, byte[]>> headers) {
+ List<Tuple2<String, List<Byte>>> r = new ArrayList<>();
+ for (Map.Entry<String, byte[]> entry: headers) {
+ final Tuple2<String, List<Byte>> t = new Tuple2<>();
+ t.f0 = entry.getKey();
+ t.f1 = new ArrayList<>(entry.getValue().length);
+ for (byte b: entry.getValue()) {
+ t.f1.add(b);
+ }
+ r.add(t);
+ }
+ return r;
+ }
+
+ /**
+ * Sink consuming TestHeadersElement, while consuming sink generates
headers using
+ * message value and validates that headers generated from message
+ * are equal to headers in element, which were read from Kafka.
+ */
+ private static class TestHeadersElementValid implements
SinkFunction<TestHeadersElement> {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public void invoke(TestHeadersElement value, Context context)
throws Exception {
+ // calculate Headers from message
+ final Iterable<Map.Entry<String, byte[]>> headers =
headersFor(value.f0);
+ final List<Tuple2<String, List<Byte>>> expected =
headersAsList(headers);
+ assertEquals(expected, value.f2);
+ }
+ }
+
+ /**
+ * Serialization schema, which serialize given element as value, lowest
element byte as key,
+ * low 32-bit integer is also stored as two "low" headers with 16-bit
parts as headers values,
+ * and similar high 32-bit integer is stored as two "high" headers,
each 16-bit part is "high" header value.
+ */
+ private static class TestHeadersKeyedSerializationSchema implements
KeyedSerializationSchema<Long> {
+ private final String topic;
+
+ TestHeadersKeyedSerializationSchema(String topic) {
+ this.topic = Objects.requireNonNull(topic);
+ }
+
+ @Override
+ public byte[] serializeKey(Long element) {
+ return new byte[] { element.byteValue() };
+ }
+
+ @Override
+ public byte[] serializeValue(Long data) {
+ return data == null ? null : Longs.toByteArray(data);
+ }
+
+ @Override
+ public String getTargetTopic(Long element) {
+ return topic;
+ }
+
+ @Override
+ public Iterable<Map.Entry<String, byte[]>> headers(Long
element) {
+ return headersFor(element);
+ }
+ }
+
+ /**
+ * Deserialization schema for TestHeadersElement elements.
+ */
+ private static class TestHeadersKeyedDeserializationSchema implements
KeyedDeserializationSchema<TestHeadersElement> {
+ private final long count;
+
+ TestHeadersKeyedDeserializationSchema(long count){
+ this.count = count;
+ }
+
+ @Override
+ public TypeInformation<TestHeadersElement> getProducedType() {
+ return TypeInformation.of(TestHeadersElement.class);
+ }
+
+ @Override
+ public TestHeadersElement deserialize(byte[] messageKey, byte[]
message, String topic, int partition, long offset) throws IOException {
+ final TestHeadersElement element = new
TestHeadersElement();
+ element.f0 = Longs.fromByteArray(message);
+ element.f1 = messageKey[0];
+ element.f2 = new ArrayList<>(0);
+ return element;
+ }
+
+ @Override
+ public boolean isEndOfStream(TestHeadersElement nextElement) {
+ return nextElement.f0 >= count;
+ }
+
+ @Override
+ public TestHeadersElement deserialize(byte[] messageKey, byte[]
message, String topic, int partition, long offset, Iterable<Map.Entry<String,
byte[]>> headers) throws IOException {
Review comment:
too long
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services