[FLINK-8741] [kafka] Fix incorrect user code classloader in FlinkKafkaConsumer

This commit fixes incorrectly using the parent of the user code class
loader. Since Kafka 010 / 011 versions directly reuse 09 code, this fix
fixes the issue for all versions.

This commit also extends the Kafka010Example, so that is uses a custom
watermark assigner. This allows our end-to-end tests to have caught this
bug.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0479d6f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0479d6f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0479d6f2

Branch: refs/heads/master
Commit: 0479d6f254c7a2dc1b7612bd57f726a13926aeb2
Parents: d63bc75
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Fri Feb 23 19:25:06 2018 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Mon Feb 26 23:12:16 2018 +0800

----------------------------------------------------------------------
 .../kafka/internal/Kafka09Fetcher.java          |  2 +-
 .../examples/kafka/Kafka010Example.java         | 49 +++++++++++++++++++-
 .../end-to-end-test/test_streaming_kafka010.sh  |  4 +-
 3 files changed, 50 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0479d6f2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 5839360..dcc67d5 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -95,7 +95,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> {
                                watermarksPunctuated,
                                processingTimeProvider,
                                autoWatermarkInterval,
-                               userCodeClassLoader.getParent(),
+                               userCodeClassLoader,
                                consumerMetricGroup,
                                useMetrics);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0479d6f2/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
index 3fbd2b4..881aa67 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
@@ -21,16 +21,26 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
 
+import javax.annotation.Nullable;
 
 /**
  * An example that shows how to read from and write to Kafka. This will read 
String messages
  * from the input topic, prefix them by a configured prefix and output to the 
output topic.
  *
+ * <p>This example also demonstrates using a watermark assigner to generate 
per-partition
+ * watermarks directly in the Flink Kafka consumer. For demonstration 
purposes, it is assumed that
+ * the String messages are of formatted as a (message,timestamp) tuple.
+ *
  * <p>Example usage:
  *     --input-topic test-input --output-topic test-output --bootstrap.servers 
localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
  */
@@ -59,11 +69,15 @@ public class Kafka010Example {
                // make parameters available in the web interface
                env.getConfig().setGlobalJobParameters(parameterTool);
 
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
                DataStream<String> input = env
-                               .addSource(new FlinkKafkaConsumer010<>(
+                               .addSource(
+                                       new FlinkKafkaConsumer010<>(
                                                
parameterTool.getRequired("input-topic"),
                                                new SimpleStringSchema(),
-                                               parameterTool.getProperties()))
+                                               parameterTool.getProperties())
+                                       .assignTimestampsAndWatermarks(new 
CustomWatermarkExtractor()))
                                .map(new PrefixingMapper(prefix));
 
                input.addSink(
@@ -76,6 +90,9 @@ public class Kafka010Example {
        }
 
        private static class PrefixingMapper implements MapFunction<String, 
String> {
+
+               private static final long serialVersionUID = 
1180234853172462378L;
+
                private final String prefix;
 
                public PrefixingMapper(String prefix) {
@@ -87,4 +104,32 @@ public class Kafka010Example {
                        return prefix + value;
                }
        }
+
+       /**
+        * A custom {@link AssignerWithPeriodicWatermarks}, that simply assumes 
that the input stream
+        * records are strictly ascending.
+        *
+        * <p>Flink also ships some built-in convenience assigners, such as the
+        * {@link BoundedOutOfOrdernessTimestampExtractor} and {@link 
AscendingTimestampExtractor}
+        */
+       private static class CustomWatermarkExtractor implements 
AssignerWithPeriodicWatermarks<String> {
+
+               private static final long serialVersionUID = 
-742759155861320823L;
+
+               private long currentTimestamp = Long.MIN_VALUE;
+
+               @Override
+               public long extractTimestamp(String element, long 
previousElementTimestamp) {
+                       // the inputs are assumed to be of format 
(message,timestamp)
+                       long timestamp = 
Long.valueOf(element.substring(element.indexOf(",") + 1));
+                       this.currentTimestamp = timestamp;
+                       return timestamp;
+               }
+
+               @Nullable
+               @Override
+               public Watermark getCurrentWatermark() {
+                       return new Watermark(currentTimestamp == Long.MIN_VALUE 
? Long.MIN_VALUE : currentTimestamp - 1);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0479d6f2/test-infra/end-to-end-test/test_streaming_kafka010.sh
----------------------------------------------------------------------
diff --git a/test-infra/end-to-end-test/test_streaming_kafka010.sh 
b/test-infra/end-to-end-test/test_streaming_kafka010.sh
index dda2db5..51a570a 100755
--- a/test-infra/end-to-end-test/test_streaming_kafka010.sh
+++ b/test-infra/end-to-end-test/test_streaming_kafka010.sh
@@ -70,12 +70,12 @@ $FLINK_DIR/bin/flink run -d 
build-target/examples/streaming/Kafka010Example.jar
   --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 
--group.id myconsumer --auto.offset.reset earliest
 
 # send some data to Kafka
-echo -e "hello\nwhats\nup" | $KAFKA_DIR/bin/kafka-console-producer.sh 
--broker-list localhost:9092 --topic test-input
+echo -e "hello,45218\nwhats,46213\nup,51348" | 
$KAFKA_DIR/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
test-input
 
 DATA_FROM_KAFKA=$($KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server 
localhost:9092 --topic test-output --from-beginning --max-messages 3 2> 
/dev/null)
 
 # make sure we have actual newlines in the string, not "\n"
-EXPECTED=$(printf "PREFIX:hello\nPREFIX:whats\nPREFIX:up")
+EXPECTED=$(printf "PREFIX:hello,45218\nPREFIX:whats,46213\nPREFIX:up,51348")
 if [[ "$DATA_FROM_KAFKA" != "$EXPECTED" ]]; then
   echo "Output from Flink program does not match expected output."
   echo -e "EXPECTED: --$EXPECTED--"

Reply via email to