[
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16676430#comment-16676430
]
ASF GitHub Bot commented on FLINK-10600:
----------------------------------------
pnowojski closed pull request #6924: [FLINK-10600] Provide End-to-end test
cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/flink-connectors/flink-connector-kafka/pom.xml
b/flink-connectors/flink-connector-kafka/pom.xml
index cfc92c00131..617f1232e35 100644
--- a/flink-connectors/flink-connector-kafka/pom.xml
+++ b/flink-connectors/flink-connector-kafka/pom.xml
@@ -52,6 +52,10 @@ under the License.
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </exclusion>
</exclusions>
</dependency>
diff --git a/flink-dist/src/main/assemblies/bin.xml
b/flink-dist/src/main/assemblies/bin.xml
index 89228b39271..e1adab537d9 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -187,6 +187,34 @@ under the License.
</excludes>
</fileSet>
+ <!-- copy jar files of the streaming kafka examples -->
+ <fileSet>
+
<directory>../flink-examples/flink-examples-streaming-kafka/target</directory>
+ <outputDirectory>examples/streaming</outputDirectory>
+ <fileMode>0644</fileMode>
+ <includes>
+ <include>*.jar</include>
+ </includes>
+ <excludes>
+
<exclude>flink-examples-streaming-kafka*.jar</exclude>
+ <exclude>original-*.jar</exclude>
+ </excludes>
+ </fileSet>
+
+ <!-- copy jar files of the streaming kafka 0.10 examples -->
+ <fileSet>
+
<directory>../flink-examples/flink-examples-streaming-kafka-0.10/target</directory>
+ <outputDirectory>examples/streaming</outputDirectory>
+ <fileMode>0644</fileMode>
+ <includes>
+ <include>*.jar</include>
+ </includes>
+ <excludes>
+
<exclude>flink-examples-streaming-kafka*.jar</exclude>
+ <exclude>original-*.jar</exclude>
+ </excludes>
+ </fileSet>
+
<!-- copy jar files of the gelly examples -->
<fileSet>
<directory>../flink-libraries/flink-gelly-examples/target</directory>
diff --git a/flink-end-to-end-tests/run-pre-commit-tests.sh
b/flink-end-to-end-tests/run-pre-commit-tests.sh
index 15b9b152f77..3e3494a491a 100755
--- a/flink-end-to-end-tests/run-pre-commit-tests.sh
+++ b/flink-end-to-end-tests/run-pre-commit-tests.sh
@@ -54,6 +54,7 @@ run_test "Batch Python Wordcount end-to-end test"
"$END_TO_END_DIR/test-scripts/
run_test "Streaming Python Wordcount end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_python_wordcount.sh"
run_test "Wordcount end-to-end test"
"$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh"
run_test "Kafka end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_kafka010.sh"
+run_test "Modern Kafka end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_kafka.sh"
run_test "class loading end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_classloader.sh"
run_test "Shaded Hadoop S3A end-to-end test"
"$END_TO_END_DIR/test-scripts/test_shaded_hadoop_s3a.sh"
run_test "Shaded Presto S3 end-to-end test"
"$END_TO_END_DIR/test-scripts/test_shaded_presto_s3.sh"
diff --git a/flink-end-to-end-tests/test-scripts/kafka-common.sh
b/flink-end-to-end-tests/test-scripts/kafka-common.sh
index 2dc58f7305a..dedfe5208b1 100644
--- a/flink-end-to-end-tests/test-scripts/kafka-common.sh
+++ b/flink-end-to-end-tests/test-scripts/kafka-common.sh
@@ -17,6 +17,8 @@
# limitations under the License.
################################################################################
+set -e
+set -u
set -o pipefail
if [[ -z $TEST_DATA_DIR ]]; then
@@ -24,15 +26,19 @@ if [[ -z $TEST_DATA_DIR ]]; then
exit 1
fi
-KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
-CONFLUENT_DIR=$TEST_DATA_DIR/confluent-3.2.0
+KAFKA_VERSION="$1"
+CONFLUENT_VERSION="$2"
+CONFLUENT_MAJOR_VERSION="$3"
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-$KAFKA_VERSION
+CONFLUENT_DIR=$TEST_DATA_DIR/confluent-$CONFLUENT_VERSION
SCHEMA_REGISTRY_PORT=8082
SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}
function setup_kafka_dist {
# download Kafka
mkdir -p $TEST_DATA_DIR
-
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"
+
KAFKA_URL="https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.11-$KAFKA_VERSION.tgz"
echo "Downloading Kafka from $KAFKA_URL"
curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
@@ -46,7 +52,7 @@ function setup_kafka_dist {
function setup_confluent_dist {
# download confluent
mkdir -p $TEST_DATA_DIR
-
CONFLUENT_URL="http://packages.confluent.io/archive/3.2/confluent-oss-3.2.0-2.11.tar.gz"
+
CONFLUENT_URL="http://packages.confluent.io/archive/$CONFLUENT_MAJOR_VERSION/confluent-oss-$CONFLUENT_VERSION-2.11.tar.gz"
echo "Downloading confluent from $CONFLUENT_URL"
curl "$CONFLUENT_URL" > $TEST_DATA_DIR/confluent.tgz
@@ -76,13 +82,15 @@ function stop_kafka_cluster {
$KAFKA_DIR/bin/kafka-server-stop.sh
$KAFKA_DIR/bin/zookeeper-server-stop.sh
- PIDS=$(jps -vl | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk
'{print $1}')
+ # Terminate Kafka process if it still exists
+ PIDS=$(jps -vl | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk
'{print $1}'|| echo "")
if [ ! -z "$PIDS" ]; then
kill -s TERM $PIDS
fi
- PIDS=$(jps -vl | grep java | grep -i QuorumPeerMain | grep -v grep | awk
'{print $1}')
+ # Terminate QuorumPeerMain process if it still exists
+ PIDS=$(jps -vl | grep java | grep -i QuorumPeerMain | grep -v grep | awk
'{print $1}'|| echo "")
if [ ! -z "$PIDS" ]; then
kill -s TERM $PIDS
@@ -120,14 +128,7 @@ function get_partition_end_offset {
local topic=$1
local partition=$2
- # first, use the console consumer to produce a dummy consumer group
- read_messages_from_kafka 0 $topic dummy-consumer
-
- # then use the consumer offset utility to get the LOG_END_OFFSET value for
the specified partition
- $KAFKA_DIR/bin/kafka-consumer-groups.sh --describe --group dummy-consumer
--bootstrap-server localhost:9092 2> /dev/null \
- | grep "$topic \+$partition" \
- | tr -s " " \
- | cut -d " " -f 4
+ $KAFKA_DIR/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
localhost:9092 --topic $topic --partitions $partition --time -1 | cut -d ":" -f
3
}
function start_confluent_schema_registry {
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh
b/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh
new file mode 100755
index 00000000000..c5cdfde3dc0
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh
@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -e
+set -u
+set -o pipefail
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh 2.0.0 5.0.0 5.0
+
+source "$(dirname "$0")"/test_streaming_kafka_common.sh
$FLINK_DIR/examples/streaming/KafkaExample.jar
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
index c9cc19db71d..ecd651448a9 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
@@ -18,96 +18,7 @@
################################################################################
source "$(dirname "$0")"/common.sh
-source "$(dirname "$0")"/kafka-common.sh
+source "$(dirname "$0")"/kafka-common.sh 0.10.2.0 3.2.0 3.2
-setup_kafka_dist
-start_kafka_cluster
+source "$(dirname "$0")"/test_streaming_kafka_common.sh
$FLINK_DIR/examples/streaming/Kafka010Example.jar
-# modify configuration to have enough slots
-cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
-sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots:
3/" $FLINK_DIR/conf/flink-conf.yaml
-
-start_cluster
-
-function test_cleanup {
- # don't call ourselves again for another signal interruption
- trap "exit -1" INT
- # don't call ourselves again for normal exit
- trap "" EXIT
-
- stop_kafka_cluster
-
- # revert our modifications to the Flink distribution
- mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
-}
-trap test_cleanup INT
-trap test_cleanup EXIT
-
-# create the required topics
-create_kafka_topic 1 1 test-input
-create_kafka_topic 1 1 test-output
-
-# run the Flink job (detached mode)
-$FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/Kafka010Example.jar \
- --input-topic test-input --output-topic test-output \
- --prefix=PREFIX \
- --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181
--group.id myconsumer --auto.offset.reset earliest \
- --flink.partition-discovery.interval-millis 1000
-
-function verify_output {
- local expected=$(printf $1)
-
- if [[ "$2" != "$expected" ]]; then
- echo "Output from Flink program does not match expected output."
- echo -e "EXPECTED FOR KEY: --$expected--"
- echo -e "ACTUAL: --$2--"
- exit 1
- fi
-}
-
-echo "Sending messages to Kafka topic [test-input] ..."
-# send some data to Kafka
-send_messages_to_kafka
"elephant,5,45218\nsquirrel,12,46213\nbee,3,51348\nsquirrel,22,52444\nbee,10,53412\nelephant,9,54867"
test-input
-
-echo "Verifying messages from Kafka topic [test-output] ..."
-
-KEY_1_MSGS=$(read_messages_from_kafka 6 test-output elephant_consumer | grep
elephant)
-KEY_2_MSGS=$(read_messages_from_kafka 6 test-output squirrel_consumer | grep
squirrel)
-KEY_3_MSGS=$(read_messages_from_kafka 6 test-output bee_consumer | grep bee)
-
-# check all keys; make sure we have actual newlines in the string, not "\n"
-verify_output "elephant,5,45218\nelephant,14,54867" "$KEY_1_MSGS"
-verify_output "squirrel,12,46213\nsquirrel,34,52444" "$KEY_2_MSGS"
-verify_output "bee,3,51348\nbee,13,53412" "$KEY_3_MSGS"
-
-# now, we add a new partition to the topic
-echo "Repartitioning Kafka topic [test-input] ..."
-modify_num_partitions test-input 2
-
-if (( $(get_num_partitions test-input) != 2 )); then
- echo "Failed adding a partition to test-input topic."
- exit 1
-fi
-
-# send some more messages to Kafka
-echo "Sending more messages to Kafka topic [test-input] ..."
-send_messages_to_kafka
"elephant,13,64213\ngiraffe,9,65555\nbee,5,65647\nsquirrel,18,66413" test-input
-
-# verify that our assumption that the new partition actually has written
messages is correct
-if (( $(get_partition_end_offset test-input 1) == 0 )); then
- echo "The newly created partition does not have any new messages, and
therefore partition discovery cannot be verified."
- exit 1
-fi
-
-# all new messages should have been consumed, and has produced correct output
-echo "Verifying messages from Kafka topic [test-output] ..."
-
-KEY_1_MSGS=$(read_messages_from_kafka 4 test-output elephant_consumer | grep
elephant)
-KEY_2_MSGS=$(read_messages_from_kafka 4 test-output squirrel_consumer | grep
squirrel)
-KEY_3_MSGS=$(read_messages_from_kafka 4 test-output bee_consumer | grep bee)
-KEY_4_MSGS=$(read_messages_from_kafka 10 test-output giraffe_consumer | grep
giraffe)
-
-verify_output "elephant,27,64213" "$KEY_1_MSGS"
-verify_output "squirrel,52,66413" "$KEY_2_MSGS"
-verify_output "bee,18,65647" "$KEY_3_MSGS"
-verify_output "giraffe,9,65555" "$KEY_4_MSGS"
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka_common.sh
b/flink-end-to-end-tests/test-scripts/test_streaming_kafka_common.sh
new file mode 100644
index 00000000000..ff3adc158c7
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka_common.sh
@@ -0,0 +1,117 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -e
+set -u
+set -o pipefail
+
+KAFKA_EXAMPLE_JAR="$1"
+
+setup_kafka_dist
+start_kafka_cluster
+
+# modify configuration to have enough slots
+cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots:
3/" $FLINK_DIR/conf/flink-conf.yaml
+
+start_cluster
+
+function test_cleanup {
+ # don't call ourselves again for another signal interruption
+ trap "exit -1" INT
+ # don't call ourselves again for normal exit
+ trap "" EXIT
+
+ stop_kafka_cluster
+
+ # revert our modifications to the Flink distribution
+ mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
+}
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+# create the required topics
+create_kafka_topic 1 1 test-input
+create_kafka_topic 1 1 test-output
+
+# run the Flink job (detached mode)
+$FLINK_DIR/bin/flink run -d $KAFKA_EXAMPLE_JAR \
+ --input-topic test-input --output-topic test-output \
+ --prefix=PREFIX \
+ --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181
--group.id myconsumer --auto.offset.reset earliest \
+ --transaction.timeout.ms 900000 \
+ --flink.partition-discovery.interval-millis 1000
+
+function verify_output {
+ local expected=$(printf $1)
+
+ if [[ "$2" != "$expected" ]]; then
+ echo "Output from Flink program does not match expected output."
+ echo -e "EXPECTED FOR KEY: --$expected--"
+ echo -e "ACTUAL: --$2--"
+ exit 1
+ fi
+}
+
+echo "Sending messages to Kafka topic [test-input] ..."
+# send some data to Kafka
+send_messages_to_kafka
"elephant,5,45218\nsquirrel,12,46213\nbee,3,51348\nsquirrel,22,52444\nbee,10,53412\nelephant,9,54867"
test-input
+
+echo "Verifying messages from Kafka topic [test-output] ..."
+
+KEY_1_MSGS=$(read_messages_from_kafka 6 test-output elephant_consumer | grep
elephant)
+KEY_2_MSGS=$(read_messages_from_kafka 6 test-output squirrel_consumer | grep
squirrel)
+KEY_3_MSGS=$(read_messages_from_kafka 6 test-output bee_consumer | grep bee)
+
+# check all keys; make sure we have actual newlines in the string, not "\n"
+verify_output "elephant,5,45218\nelephant,14,54867" "$KEY_1_MSGS"
+verify_output "squirrel,12,46213\nsquirrel,34,52444" "$KEY_2_MSGS"
+verify_output "bee,3,51348\nbee,13,53412" "$KEY_3_MSGS"
+
+# now, we add a new partition to the topic
+echo "Repartitioning Kafka topic [test-input] ..."
+modify_num_partitions test-input 2
+
+if (( $(get_num_partitions test-input) != 2 )); then
+ echo "Failed adding a partition to test-input topic."
+ exit 1
+fi
+
+# send some more messages to Kafka
+echo "Sending more messages to Kafka topic [test-input] ..."
+send_messages_to_kafka
"elephant,13,64213\ngiraffe,9,65555\nbee,5,65647\nsquirrel,18,66413" test-input
+
+# verify that our assumption that the new partition actually has written
messages is correct
+if (( $(get_partition_end_offset test-input 1) == 0 )); then
+ echo "The newly created partition does not have any new messages, and
therefore partition discovery cannot be verified."
+ exit 1
+fi
+
+# all new messages should have been consumed, and has produced correct output
+echo "Verifying messages from Kafka topic [test-output] ..."
+
+KEY_1_MSGS=$(read_messages_from_kafka 4 test-output elephant_consumer | grep
elephant)
+KEY_2_MSGS=$(read_messages_from_kafka 4 test-output squirrel_consumer | grep
squirrel)
+KEY_3_MSGS=$(read_messages_from_kafka 4 test-output bee_consumer | grep bee)
+KEY_4_MSGS=$(read_messages_from_kafka 10 test-output giraffe_consumer | grep
giraffe)
+
+verify_output "elephant,27,64213" "$KEY_1_MSGS"
+verify_output "squirrel,52,66413" "$KEY_2_MSGS"
+verify_output "bee,18,65647" "$KEY_3_MSGS"
+verify_output "giraffe,9,65555" "$KEY_4_MSGS"
diff --git a/flink-examples/flink-examples-streaming-kafka-0.10/pom.xml
b/flink-examples/flink-examples-streaming-kafka-0.10/pom.xml
new file mode 100644
index 00000000000..ced3919da3e
--- /dev/null
+++ b/flink-examples/flink-examples-streaming-kafka-0.10/pom.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-examples</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.8-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-examples-streaming-kafka-0.10</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-examples-streaming-kafka-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Use the shade plugin to build a fat jar for the
kafka connector test -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>fat-jar-kafka-example</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+
<shadeTestJar>false</shadeTestJar>
+
<shadedArtifactAttached>false</shadedArtifactAttached>
+
<createDependencyReducedPom>false</createDependencyReducedPom>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+
<mainClass>org.apache.flink.streaming.examples.kafka.Kafka010Example</mainClass>
+ </transformer>
+ </transformers>
+
<finalName>Kafka010Example</finalName>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/flink-examples/flink-examples-streaming-kafka-0.10/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
b/flink-examples/flink-examples-streaming-kafka-0.10/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
new file mode 100644
index 00000000000..2df1f5d8496
--- /dev/null
+++
b/flink-examples/flink-examples-streaming-kafka-0.10/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
+import org.apache.flink.streaming.examples.kafka.base.CustomWatermarkExtractor;
+import org.apache.flink.streaming.examples.kafka.base.KafkaEvent;
+import org.apache.flink.streaming.examples.kafka.base.KafkaEventSchema;
+import org.apache.flink.streaming.examples.kafka.base.KafkaExampleUtil;
+import org.apache.flink.streaming.examples.kafka.base.RollingAdditionMapper;
+
+/**
+ * A simple example that shows how to read from and write to Kafka. This will
read String messages
+ * from the input topic, parse them into a POJO type {@link KafkaEvent}, group
by some key, and finally
+ * perform a rolling addition on each key for which the results are written
back to another topic.
+ *
+ * <p>This example also demonstrates using a watermark assigner to generate
per-partition
+ * watermarks directly in the Flink Kafka consumer. For demonstration
purposes, it is assumed that
+ * the String messages are of formatted as a (word,frequency,timestamp) tuple.
+ *
+ * <p>Example usage:
+ * --input-topic test-input --output-topic test-output --bootstrap.servers
localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
+ */
+public class Kafka010Example {
+
+ public static void main(String[] args) throws Exception {
+ // parse input arguments
+ final ParameterTool parameterTool =
ParameterTool.fromArgs(args);
+ StreamExecutionEnvironment env =
KafkaExampleUtil.prepareExecutionEnv(parameterTool);
+
+ DataStream<KafkaEvent> input = env
+ .addSource(
+ new FlinkKafkaConsumer010<>(
+
parameterTool.getRequired("input-topic"),
+ new KafkaEventSchema(),
+ parameterTool.getProperties())
+ .assignTimestampsAndWatermarks(new
CustomWatermarkExtractor()))
+ .keyBy("word")
+ .map(new RollingAdditionMapper());
+
+ input.addSink(
+ new FlinkKafkaProducer010<>(
+
parameterTool.getRequired("output-topic"),
+ new KafkaEventSchema(),
+ parameterTool.getProperties()));
+
+ env.execute("Kafka 0.10 Example");
+ }
+
+}
diff --git
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
b/flink-examples/flink-examples-streaming-kafka-0.10/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
similarity index 97%
rename from
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
rename to
flink-examples/flink-examples-streaming-kafka-0.10/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
index 9f4fdc4c294..c2ea5617a68 100644
---
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
+++
b/flink-examples/flink-examples-streaming-kafka-0.10/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.scala.examples.kafka
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010,
FlinkKafkaProducer010}
/**
diff --git a/flink-examples/flink-examples-streaming-kafka-base/pom.xml
b/flink-examples/flink-examples-streaming-kafka-base/pom.xml
new file mode 100644
index 00000000000..3f389f843a1
--- /dev/null
+++ b/flink-examples/flink-examples-streaming-kafka-base/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-examples</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.8-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-examples-streaming-kafka-base</artifactId>
+ <name>flink-examples-streaming-kafka-base</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/CustomWatermarkExtractor.java
b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/CustomWatermarkExtractor.java
new file mode 100644
index 00000000000..51de582dc0f
--- /dev/null
+++
b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/CustomWatermarkExtractor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka.base;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import javax.annotation.Nullable;
+
+/**
+ * A custom {@link AssignerWithPeriodicWatermarks}, that simply assumes that
the input stream
+ * records are strictly ascending.
+ *
+ * <p>Flink also ships some built-in convenience assigners, such as the
+ * {@link BoundedOutOfOrdernessTimestampExtractor} and {@link
AscendingTimestampExtractor}
+ */
+public class CustomWatermarkExtractor implements
AssignerWithPeriodicWatermarks<KafkaEvent> {
+
+ private static final long serialVersionUID = -742759155861320823L;
+
+ private long currentTimestamp = Long.MIN_VALUE;
+
+ @Override
+ public long extractTimestamp(KafkaEvent event, long
previousElementTimestamp) {
+ // the inputs are assumed to be of format (message,timestamp)
+ this.currentTimestamp = event.getTimestamp();
+ return event.getTimestamp();
+ }
+
+ @Nullable
+ @Override
+ public Watermark getCurrentWatermark() {
+ return new Watermark(currentTimestamp == Long.MIN_VALUE ?
Long.MIN_VALUE : currentTimestamp - 1);
+ }
+}
diff --git
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java
b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaEvent.java
similarity index 97%
rename from
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java
rename to
flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaEvent.java
index a144fc38257..7a8f84f8ca8 100644
---
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java
+++
b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaEvent.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.examples.kafka;
+package org.apache.flink.streaming.examples.kafka.base;
/**
* The event type used in the {@link Kafka010Example}.
diff --git
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java
b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaEventSchema.java
similarity index 97%
rename from
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java
rename to
flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaEventSchema.java
index 5b8e17dadca..ea9c12b6056 100644
---
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java
+++
b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaEventSchema.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.examples.kafka;
+package org.apache.flink.streaming.examples.kafka.base;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
diff --git
a/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaExampleUtil.java
b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaExampleUtil.java
new file mode 100644
index 00000000000..447dec24648
--- /dev/null
+++
b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaExampleUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka.base;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * The util class for kafka example.
+ */
+public class KafkaExampleUtil {
+
+ public static StreamExecutionEnvironment
prepareExecutionEnv(ParameterTool parameterTool)
+ throws Exception {
+
+ if (parameterTool.getNumberOfParameters() < 5) {
+ System.out.println("Missing parameters!\n" +
+ "Usage: Kafka --input-topic <topic>
--output-topic <topic> " +
+ "--bootstrap.servers <kafka brokers> " +
+ "--zookeeper.connect <zk quorum> --group.id
<some id>");
+ throw new Exception("Missing parameters!\n" +
+ "Usage: Kafka --input-topic <topic>
--output-topic <topic> " +
+ "--bootstrap.servers <kafka brokers> " +
+ "--zookeeper.connect <zk quorum> --group.id
<some id>");
+ }
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
+ env.enableCheckpointing(5000); // create a checkpoint every 5
seconds
+ env.getConfig().setGlobalJobParameters(parameterTool); // make
parameters available in the web interface
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ return env;
+ }
+
+}
diff --git
a/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/RollingAdditionMapper.java
b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/RollingAdditionMapper.java
new file mode 100644
index 00000000000..e71f86cefba
--- /dev/null
+++
b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/RollingAdditionMapper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka.base;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * A {@link RichMapFunction} that continuously outputs the current total
frequency count of a key.
+ * The current total count is keyed state managed by Flink.
+ */
+public class RollingAdditionMapper extends RichMapFunction<KafkaEvent,
KafkaEvent> {
+
+ private static final long serialVersionUID = 1180234853172462378L;
+
+ private transient ValueState<Integer> currentTotalCount;
+
+ @Override
+ public KafkaEvent map(KafkaEvent event) throws Exception {
+ Integer totalCount = currentTotalCount.value();
+
+ if (totalCount == null) {
+ totalCount = 0;
+ }
+ totalCount += event.getFrequency();
+
+ currentTotalCount.update(totalCount);
+
+ return new KafkaEvent(event.getWord(), totalCount,
event.getTimestamp());
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ currentTotalCount = getRuntimeContext().getState(new
ValueStateDescriptor<>("currentTotalCount", Integer.class));
+ }
+}
diff --git a/flink-examples/flink-examples-streaming-kafka/pom.xml
b/flink-examples/flink-examples-streaming-kafka/pom.xml
new file mode 100644
index 00000000000..0434460e981
--- /dev/null
+++ b/flink-examples/flink-examples-streaming-kafka/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-examples</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.8-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-examples-streaming-kafka</artifactId>
+ <name>flink-examples-streaming-kafka</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-examples-streaming-kafka-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Use the shade plugin to build a fat jar for the
kafka connector test -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>fat-jar-kafka-example</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+
<shadeTestJar>false</shadeTestJar>
+
<shadedArtifactAttached>false</shadedArtifactAttached>
+
<createDependencyReducedPom>false</createDependencyReducedPom>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+
<mainClass>org.apache.flink.streaming.examples.kafka.KafkaExample</mainClass>
+ </transformer>
+ </transformers>
+
<finalName>KafkaExample</finalName>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/flink-examples/flink-examples-streaming-kafka/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaExample.java
b/flink-examples/flink-examples-streaming-kafka/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaExample.java
new file mode 100644
index 00000000000..27e73d18c08
--- /dev/null
+++
b/flink-examples/flink-examples-streaming-kafka/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaExample.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.examples.kafka.base.CustomWatermarkExtractor;
+import org.apache.flink.streaming.examples.kafka.base.KafkaEvent;
+import org.apache.flink.streaming.examples.kafka.base.KafkaEventSchema;
+import org.apache.flink.streaming.examples.kafka.base.KafkaExampleUtil;
+import org.apache.flink.streaming.examples.kafka.base.RollingAdditionMapper;
+import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+
+/**
+ * A simple example that shows how to read from and write to modern Kafka.
This will read String messages
+ * from the input topic, parse them into a POJO type {@link KafkaEvent}, group
by some key, and finally
+ * perform a rolling addition on each key for which the results are written
back to another topic.
+ *
+ * <p>This example also demonstrates using a watermark assigner to generate
per-partition
+ * watermarks directly in the Flink Kafka consumer. For demonstration
purposes, it is assumed that
+ * the String messages are of formatted as a (word,frequency,timestamp) tuple.
+ *
+ * <p>Example usage:
+ * --input-topic test-input --output-topic test-output --bootstrap.servers
localhost:9092
+ * --zookeeper.connect localhost:2181 --group.id myconsumer
+ */
+public class KafkaExample extends KafkaExampleUtil {
+
+ public static void main(String[] args) throws Exception {
+ // parse input arguments
+ final ParameterTool parameterTool =
ParameterTool.fromArgs(args);
+ StreamExecutionEnvironment env =
KafkaExampleUtil.prepareExecutionEnv(parameterTool);
+
+ DataStream<KafkaEvent> input = env
+ .addSource(
+ new FlinkKafkaConsumer<>(
+
parameterTool.getRequired("input-topic"),
+ new KafkaEventSchema(),
+ parameterTool.getProperties())
+ .assignTimestampsAndWatermarks(new
CustomWatermarkExtractor()))
+ .keyBy("word")
+ .map(new RollingAdditionMapper());
+
+ input.addSink(
+ new FlinkKafkaProducer<>(
+ parameterTool.getRequired("output-topic"),
+ new KeyedSerializationSchemaWrapper<>(new
KafkaEventSchema()),
+ parameterTool.getProperties(),
+ FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
+
+ env.execute("Modern Kafka Example");
+ }
+
+}
diff --git a/flink-examples/flink-examples-streaming/pom.xml
b/flink-examples/flink-examples-streaming/pom.xml
index 180a6cbe9d3..30347abfcb7 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -531,43 +531,6 @@ under the License.
</configuration>
</execution>
- <execution>
-
<id>fat-jar-kafka-010-example</id>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
-
<shadeTestJar>false</shadeTestJar>
-
<shadedArtifactAttached>false</shadedArtifactAttached>
-
<createDependencyReducedPom>false</createDependencyReducedPom>
- <transformers>
- <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-
<mainClass>org.apache.flink.streaming.examples.kafka.Kafka010Example</mainClass>
- </transformer>
- </transformers>
-
<finalName>Kafka010Example</finalName>
- <!--
<outputFile>Kafka.jar</outputFile> -->
- <filters>
- <filter>
-
<artifact>*</artifact>
-
<includes>
-
<include>org/apache/flink/streaming/examples/kafka/**</include>
-
<include>org/apache/flink/streaming/**</include>
-
<include>org/apache/kafka/**</include>
-
<include>org/apache/curator/**</include>
-
<include>org/apache/zookeeper/**</include>
-
<include>org/apache/jute/**</include>
-
<include>org/I0Itec/**</include>
-
<include>jline/**</include>
-
<include>com/yammer/**</include>
-
<include>kafka/**</include>
-
</includes>
- </filter>
- </filters>
- </configuration>
- </execution>
-
<execution>
<id>fat-jar-twitter-example</id>
<phase>package</phase>
diff --git
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
deleted file mode 100644
index 62bfd4fc6c5..00000000000
---
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.kafka;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
-import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
-
-import javax.annotation.Nullable;
-
-/**
- * A simple example that shows how to read from and write to Kafka. This will
read String messages
- * from the input topic, parse them into a POJO type {@link KafkaEvent}, group
by some key, and finally
- * perform a rolling addition on each key for which the results are written
back to another topic.
- *
- * <p>This example also demonstrates using a watermark assigner to generate
per-partition
- * watermarks directly in the Flink Kafka consumer. For demonstration
purposes, it is assumed that
- * the String messages are of formatted as a (word,frequency,timestamp) tuple.
- *
- * <p>Example usage:
- * --input-topic test-input --output-topic test-output --bootstrap.servers
localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
- */
-public class Kafka010Example {
-
- public static void main(String[] args) throws Exception {
- // parse input arguments
- final ParameterTool parameterTool =
ParameterTool.fromArgs(args);
-
- if (parameterTool.getNumberOfParameters() < 5) {
- System.out.println("Missing parameters!\n" +
- "Usage: Kafka --input-topic <topic>
--output-topic <topic> " +
- "--bootstrap.servers <kafka brokers> " +
- "--zookeeper.connect <zk quorum>
--group.id <some id>");
- return;
- }
-
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().disableSysoutLogging();
-
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
- env.enableCheckpointing(5000); // create a checkpoint every 5
seconds
- env.getConfig().setGlobalJobParameters(parameterTool); // make
parameters available in the web interface
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- DataStream<KafkaEvent> input = env
- .addSource(
- new FlinkKafkaConsumer010<>(
-
parameterTool.getRequired("input-topic"),
- new KafkaEventSchema(),
- parameterTool.getProperties())
- .assignTimestampsAndWatermarks(new
CustomWatermarkExtractor()))
- .keyBy("word")
- .map(new RollingAdditionMapper());
-
- input.addSink(
- new FlinkKafkaProducer010<>(
-
parameterTool.getRequired("output-topic"),
- new KafkaEventSchema(),
- parameterTool.getProperties()));
-
- env.execute("Kafka 0.10 Example");
- }
-
- /**
- * A {@link RichMapFunction} that continuously outputs the current
total frequency count of a key.
- * The current total count is keyed state managed by Flink.
- */
- private static class RollingAdditionMapper extends
RichMapFunction<KafkaEvent, KafkaEvent> {
-
- private static final long serialVersionUID =
1180234853172462378L;
-
- private transient ValueState<Integer> currentTotalCount;
-
- @Override
- public KafkaEvent map(KafkaEvent event) throws Exception {
- Integer totalCount = currentTotalCount.value();
-
- if (totalCount == null) {
- totalCount = 0;
- }
- totalCount += event.getFrequency();
-
- currentTotalCount.update(totalCount);
-
- return new KafkaEvent(event.getWord(), totalCount,
event.getTimestamp());
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- currentTotalCount = getRuntimeContext().getState(new
ValueStateDescriptor<>("currentTotalCount", Integer.class));
- }
- }
-
- /**
- * A custom {@link AssignerWithPeriodicWatermarks}, that simply assumes
that the input stream
- * records are strictly ascending.
- *
- * <p>Flink also ships some built-in convenience assigners, such as the
- * {@link BoundedOutOfOrdernessTimestampExtractor} and {@link
AscendingTimestampExtractor}
- */
- private static class CustomWatermarkExtractor implements
AssignerWithPeriodicWatermarks<KafkaEvent> {
-
- private static final long serialVersionUID =
-742759155861320823L;
-
- private long currentTimestamp = Long.MIN_VALUE;
-
- @Override
- public long extractTimestamp(KafkaEvent event, long
previousElementTimestamp) {
- // the inputs are assumed to be of format
(message,timestamp)
- this.currentTimestamp = event.getTimestamp();
- return event.getTimestamp();
- }
-
- @Nullable
- @Override
- public Watermark getCurrentWatermark() {
- return new Watermark(currentTimestamp == Long.MIN_VALUE
? Long.MIN_VALUE : currentTimestamp - 1);
- }
- }
-}
diff --git a/flink-examples/pom.xml b/flink-examples/pom.xml
index 165130d3bdf..0905d01ed3b 100644
--- a/flink-examples/pom.xml
+++ b/flink-examples/pom.xml
@@ -35,6 +35,9 @@ under the License.
<modules>
<module>flink-examples-batch</module>
<module>flink-examples-streaming</module>
+ <module>flink-examples-streaming-kafka-base</module>
+ <module>flink-examples-streaming-kafka</module>
+ <module>flink-examples-streaming-kafka-0.10</module>
<module>flink-examples-table</module>
</modules>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Provide End-to-end test cases for modern Kafka connectors
> ---------------------------------------------------------
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
> Issue Type: Sub-task
> Components: Kafka Connector
> Reporter: vinoyang
> Assignee: vinoyang
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.7.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)