[FLINK-8741] [kafka] Fix incorrect user code classloader in FlinkKafkaConsumer
This commit fixes incorrectly using the parent of the user code class loader. Since Kafka 010 / 011 versions directly reuse 09 code, this fix fixes the issue for all versions. This commit also extends the Kafka010Example, so that is uses a custom watermark assigner. This allows our end-to-end tests to have caught this bug. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0479d6f2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0479d6f2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0479d6f2 Branch: refs/heads/master Commit: 0479d6f254c7a2dc1b7612bd57f726a13926aeb2 Parents: d63bc75 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Fri Feb 23 19:25:06 2018 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Mon Feb 26 23:12:16 2018 +0800 ---------------------------------------------------------------------- .../kafka/internal/Kafka09Fetcher.java | 2 +- .../examples/kafka/Kafka010Example.java | 49 +++++++++++++++++++- .../end-to-end-test/test_streaming_kafka010.sh | 4 +- 3 files changed, 50 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0479d6f2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index 5839360..dcc67d5 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -95,7 +95,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> { watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, - userCodeClassLoader.getParent(), + userCodeClassLoader, consumerMetricGroup, useMetrics); http://git-wip-us.apache.org/repos/asf/flink/blob/0479d6f2/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java index 3fbd2b4..881aa67 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java @@ -21,16 +21,26 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; +import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; +import javax.annotation.Nullable; /** * An example that shows how to read from and write to Kafka. This will read String messages * from the input topic, prefix them by a configured prefix and output to the output topic. * + * <p>This example also demonstrates using a watermark assigner to generate per-partition + * watermarks directly in the Flink Kafka consumer. For demonstration purposes, it is assumed that + * the String messages are of formatted as a (message,timestamp) tuple. + * * <p>Example usage: * --input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer */ @@ -59,11 +69,15 @@ public class Kafka010Example { // make parameters available in the web interface env.getConfig().setGlobalJobParameters(parameterTool); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + DataStream<String> input = env - .addSource(new FlinkKafkaConsumer010<>( + .addSource( + new FlinkKafkaConsumer010<>( parameterTool.getRequired("input-topic"), new SimpleStringSchema(), - parameterTool.getProperties())) + parameterTool.getProperties()) + .assignTimestampsAndWatermarks(new CustomWatermarkExtractor())) .map(new PrefixingMapper(prefix)); input.addSink( @@ -76,6 +90,9 @@ public class Kafka010Example { } private static class PrefixingMapper implements MapFunction<String, String> { + + private static final long serialVersionUID = 1180234853172462378L; + private final String prefix; public PrefixingMapper(String prefix) { @@ -87,4 +104,32 @@ public class Kafka010Example { return prefix + value; } } + + /** + * A custom {@link AssignerWithPeriodicWatermarks}, that simply assumes that the input stream + * records are strictly ascending. + * + * <p>Flink also ships some built-in convenience assigners, such as the + * {@link BoundedOutOfOrdernessTimestampExtractor} and {@link AscendingTimestampExtractor} + */ + private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<String> { + + private static final long serialVersionUID = -742759155861320823L; + + private long currentTimestamp = Long.MIN_VALUE; + + @Override + public long extractTimestamp(String element, long previousElementTimestamp) { + // the inputs are assumed to be of format (message,timestamp) + long timestamp = Long.valueOf(element.substring(element.indexOf(",") + 1)); + this.currentTimestamp = timestamp; + return timestamp; + } + + @Nullable + @Override + public Watermark getCurrentWatermark() { + return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/0479d6f2/test-infra/end-to-end-test/test_streaming_kafka010.sh ---------------------------------------------------------------------- diff --git a/test-infra/end-to-end-test/test_streaming_kafka010.sh b/test-infra/end-to-end-test/test_streaming_kafka010.sh index dda2db5..51a570a 100755 --- a/test-infra/end-to-end-test/test_streaming_kafka010.sh +++ b/test-infra/end-to-end-test/test_streaming_kafka010.sh @@ -70,12 +70,12 @@ $FLINK_DIR/bin/flink run -d build-target/examples/streaming/Kafka010Example.jar --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest # send some data to Kafka -echo -e "hello\nwhats\nup" | $KAFKA_DIR/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-input +echo -e "hello,45218\nwhats,46213\nup,51348" | $KAFKA_DIR/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-input DATA_FROM_KAFKA=$($KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-output --from-beginning --max-messages 3 2> /dev/null) # make sure we have actual newlines in the string, not "\n" -EXPECTED=$(printf "PREFIX:hello\nPREFIX:whats\nPREFIX:up") +EXPECTED=$(printf "PREFIX:hello,45218\nPREFIX:whats,46213\nPREFIX:up,51348") if [[ "$DATA_FROM_KAFKA" != "$EXPECTED" ]]; then echo "Output from Flink program does not match expected output." echo -e "EXPECTED: --$EXPECTED--"
