yanghua closed pull request #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1
connectors
URL: https://github.com/apache/flink/pull/6577
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index b4416e8a209..73a331e6194 100644
---
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -193,7 +193,7 @@
/**
* User defined properties for the Producer.
*/
- private final Properties producerConfig;
+ protected final Properties producerConfig;
/**
* The name of the default topic this producer is writing data to.
@@ -239,7 +239,7 @@
/**
* Semantic chosen for this instance.
*/
- private Semantic semantic;
+ protected Semantic semantic;
// -------------------------------- Runtime fields
------------------------------------------
@@ -893,6 +893,10 @@ protected void finishRecoveringContext() {
LOG.info("Recovered transactionalIds {}",
getUserContext().get().transactionalIds);
}
+ protected FlinkKafkaProducer createProducer() {
+ return new FlinkKafkaProducer<>(this.producerConfig);
+ }
+
/**
* After initialization make sure that all previous transactions from
the current user context have been completed.
*/
@@ -958,7 +962,7 @@ private void
recycleTransactionalProducer(FlinkKafkaProducer<byte[], byte[]> pro
}
private FlinkKafkaProducer<byte[], byte[]> initProducer(boolean
registerMetrics) {
- FlinkKafkaProducer<byte[], byte[]> producer = new
FlinkKafkaProducer<>(this.producerConfig);
+ FlinkKafkaProducer<byte[], byte[]> producer = createProducer();
RuntimeContext ctx = getRuntimeContext();
diff --git
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
index 8faff38749f..2f47bf15a62 100644
---
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
+++
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -106,10 +106,10 @@
public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkKafkaProducer.class);
- private final KafkaProducer<K, V> kafkaProducer;
+ protected final KafkaProducer<K, V> kafkaProducer;
@Nullable
- private final String transactionalId;
+ protected final String transactionalId;
public FlinkKafkaProducer(Properties properties) {
transactionalId =
properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
@@ -257,7 +257,7 @@ private TransactionalRequestResult enqueueNewPartitions() {
}
}
- private static Enum<?> getEnum(String enumFullName) {
+ protected static Enum<?> getEnum(String enumFullName) {
String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
if (x.length == 2) {
String enumClassName = x[0];
@@ -272,7 +272,7 @@ private TransactionalRequestResult enqueueNewPartitions() {
return null;
}
- private static Object invoke(Object object, String methodName,
Object... args) {
+ protected static Object invoke(Object object, String methodName,
Object... args) {
Class<?>[] argTypes = new Class[args.length];
for (int i = 0; i < args.length; i++) {
argTypes[i] = args[i].getClass();
@@ -290,7 +290,7 @@ private static Object invoke(Object object, String
methodName, Class<?>[] argTyp
}
}
- private static Object getValue(Object object, String fieldName) {
+ protected static Object getValue(Object object, String fieldName) {
return getValue(object, object.getClass(), fieldName);
}
@@ -304,7 +304,7 @@ private static Object getValue(Object object, Class<?>
clazz, String fieldName)
}
}
- private static void setValue(Object object, String fieldName, Object
value) {
+ protected static void setValue(Object object, String fieldName, Object
value) {
try {
Field field =
object.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
diff --git a/flink-connectors/flink-connector-kafka-1.0/pom.xml
b/flink-connectors/flink-connector-kafka-1.0/pom.xml
new file mode 100644
index 00000000000..b82dfefcf38
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-1.0/pom.xml
@@ -0,0 +1,315 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-connectors</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.7-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+
<artifactId>flink-connector-kafka-1.0_${scala.binary.version}</artifactId>
+ <name>flink-connector-kafka-1.0</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <kafka.version>1.0.0</kafka.version>
+ </properties>
+
+ <dependencies>
+
+ <!-- core dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+
<artifactId>kafka_${scala.binary.version}</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- streaming-java dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Add Kafka 1.0.x as a dependency -->
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <!-- Projects depending on this project, won't depend
on flink-table. -->
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>${project.version}</version>
+ <!-- Projects depending on this project, won't depend
on flink-avro. -->
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${project.version}</version>
+ <!-- Projects depending on this project, won't depend
on flink-json. -->
+ <optional>true</optional>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <!-- exclude Kafka dependencies -->
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+
<artifactId>kafka_${scala.binary.version}</artifactId>
+ </exclusion>
+ </exclusions>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <!-- exclude Kafka dependencies -->
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+
<artifactId>kafka_${scala.binary.version}</artifactId>
+ </exclusion>
+ </exclusions>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <!-- include 1.0 server for tests -->
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>${kafka.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-tests_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-jmx</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <profiles>
+ <!-- Create SQL Client uber jars by default -->
+ <profile>
+ <id>sql-jars</id>
+ <activation>
+ <property>
+ <name>!skipSqlJars</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+
<groupId>org.apache.maven.plugins</groupId>
+
<artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+
<phase>package</phase>
+ <goals>
+
<goal>shade</goal>
+ </goals>
+ <configuration>
+
<shadedArtifactAttached>true</shadedArtifactAttached>
+
<shadedClassifierName>sql-jar</shadedClassifierName>
+
<artifactSet>
+
<includes combine.children="append">
+
<include>org.apache.kafka:*</include>
+
<include>org.apache.flink:flink-connector-kafka-base_${scala.binary.version}</include>
+
<include>org.apache.flink:flink-connector-kafka-0.9_${scala.binary.version}</include>
+
<include>org.apache.flink:flink-connector-kafka-0.10_${scala.binary.version}</include>
+
<include>org.apache.flink:flink-connector-kafka-0.11_${scala.binary.version}</include>
+
</includes>
+
</artifactSet>
+
<filters>
+
<filter>
+
<artifact>*:*</artifact>
+
<excludes>
+
<exclude>kafka/kafka-version.properties</exclude>
+
</excludes>
+
</filter>
+
</filters>
+
<relocations>
+
<relocation>
+
<pattern>org.apache.kafka</pattern>
+
<shadedPattern>org.apache.flink.kafka10.shaded.org.apache.kafka</shadedPattern>
+
</relocation>
+
</relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <configuration>
+ <includes>
+
<include>**/KafkaTestEnvironmentImpl*</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-test-sources</id>
+ <goals>
+
<goal>test-jar-no-fork</goal>
+ </goals>
+ <configuration>
+ <includes>
+
<include>**/KafkaTestEnvironmentImpl*</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <!-- Enforce single fork execution due
to heavy mini cluster use in the tests -->
+ <forkCount>1</forkCount>
+ <argLine>-Xms256m -Xmx2048m
-Dlog4j.configuration=${log4j.configuration}
-Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer10.java
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer10.java
new file mode 100644
index 00000000000..e45b97846c7
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer10.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel
data stream from
+ * Apache Kafka 1.0.x. The consumer can run in multiple parallel instances,
each of which will pull
+ * data from one or more Kafka partitions.
+ */
+public class FlinkKafkaConsumer10<T> extends FlinkKafkaConsumer011<T> {
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 1.0.x.
+ *
+ * @param topic The name of the topic that should be
consumed.
+ * @param valueDeserializer The de-/serializer used to convert between
Kafka's byte messages and Flink's objects.
+ * @param props
+ */
+ public FlinkKafkaConsumer10(String topic, DeserializationSchema<T>
valueDeserializer, Properties props) {
+ super(topic, valueDeserializer, props);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 1.0.x
+ *
+ * <p>This constructor allows passing a {@see
KeyedDeserializationSchema} for reading key/value
+ * pairs, offsets, and topic names from Kafka.
+ *
+ * @param topic The name of the topic that should be consumed.
+ * @param deserializer The keyed de-/serializer used to convert between
Kafka's byte messages and Flink's objects.
+ * @param props
+ */
+ public FlinkKafkaConsumer10(String topic, KeyedDeserializationSchema<T>
deserializer, Properties props) {
+ super(topic, deserializer, props);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 1.0.x
+ *
+ * <p>This constructor allows passing multiple topics to the consumer.
+ *
+ * @param topics The Kafka topics to read from.
+ * @param deserializer The de-/serializer used to convert between
Kafka's byte messages and Flink's objects.
+ * @param props
+ */
+ public FlinkKafkaConsumer10(List<String> topics,
DeserializationSchema<T> deserializer, Properties props) {
+ super(topics, deserializer, props);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 1.0.x
+ *
+ * <p>This constructor allows passing multiple topics and a key/value
deserialization schema.
+ *
+ * @param topics The Kafka topics to read from.
+ * @param deserializer The keyed de-/serializer used to convert between
Kafka's byte messages and Flink's objects.
+ * @param props
+ */
+ public FlinkKafkaConsumer10(List<String> topics,
KeyedDeserializationSchema<T> deserializer, Properties props) {
+ super(topics, deserializer, props);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 1.0.x. Use
this constructor to
+ * subscribe to multiple topics based on a regular expression pattern.
+ *
+ * <p>If partition discovery is enabled (by setting a non-negative
value for
+ * {@link FlinkKafkaConsumer10#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS}
in the properties), topics
+ * with names matching the pattern will also be subscribed to as they
are created on the fly.
+ *
+ * @param subscriptionPattern The regular expression for a pattern of
topic names to subscribe to.
+ * @param valueDeserializer The de-/serializer used to convert
between Kafka's byte messages and Flink's objects.
+ * @param props
+ */
+ public FlinkKafkaConsumer10(Pattern subscriptionPattern,
DeserializationSchema<T> valueDeserializer, Properties props) {
+ super(subscriptionPattern, valueDeserializer, props);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 1.0.x. Use
this constructor to
+ * subscribe to multiple topics based on a regular expression pattern.
+ *
+ * <p>If partition discovery is enabled (by setting a non-negative
value for
+ * {@link FlinkKafkaConsumer10#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS}
in the properties), topics
+ * with names matching the pattern will also be subscribed to as they
are created on the fly.
+ *
+ * <p>This constructor allows passing a {@see
KeyedDeserializationSchema} for reading key/value
+ * pairs, offsets, and topic names from Kafka.
+ *
+ * @param subscriptionPattern The regular expression for a pattern of
topic names to subscribe to.
+ * @param deserializer The keyed de-/serializer used to convert
between Kafka's byte messages and Flink's objects.
+ * @param props
+ */
+ public FlinkKafkaConsumer10(Pattern subscriptionPattern,
KeyedDeserializationSchema<T> deserializer, Properties props) {
+ super(subscriptionPattern, deserializer, props);
+ }
+}
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10.java
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10.java
new file mode 100644
index 00000000000..de7e0b7b7c4
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafka10Producer;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible
with Kafka 1.0.x. By default producer
+ * will use {@link FlinkKafkaProducer10.Semantic#AT_LEAST_ONCE} semantic.
+ * Before using {@link FlinkKafkaProducer10.Semantic#EXACTLY_ONCE} please
refer to Flink's
+ * Kafka connector documentation.
+ */
+public class FlinkKafkaProducer10<IN> extends FlinkKafkaProducer011<IN> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkKafkaProducer10.class);
+
+ /**
+ * Creates a FlinkKafka10Producer for a given topic. The sink produces
a DataStream to
+ * the topic.
+ *
+ * @param brokerList Comma separated addresses of the brokers
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema
+ */
+ public FlinkKafkaProducer10(String brokerList, String topicId,
SerializationSchema<IN> serializationSchema) {
+ super(brokerList, topicId, serializationSchema);
+ }
+
+ /**
+ * Creates a FlinkKafka10Producer for a given topic. The sink produces
a DataStream to
+ * the topic.
+ *
+ * <p>Using this constructor, the default
+ * {@link
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner}
will be used as
+ * the partitioner. This default partitioner maps each sink subtask to
a single Kafka
+ * partition (i.e. all records received by a sink subtask will end up
in the same
+ * Kafka partition).
+ *
+ * <p>To use a custom partitioner, please use
+ * {@link #FlinkKafkaProducer10(String, SerializationSchema,
Properties, Optional)} instead.
+ *
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined key-less serialization
schema.
+ * @param producerConfig
+ */
+ public FlinkKafkaProducer10(String topicId, SerializationSchema<IN>
serializationSchema, Properties producerConfig) {
+ super(topicId, serializationSchema, producerConfig);
+ }
+
+ /**
+ * Creates a FlinkKafka10Producer for a given topic. The sink produces
its input to
+ * the topic. It accepts a key-less {@link SerializationSchema} and
possibly a custom {@link FlinkKafkaPartitioner}.
+ *
+ * <p>Since a key-less {@link SerializationSchema} is used, all records
sent to Kafka will not have an
+ * attached key. Therefore, if a partitioner is also not provided,
records will be distributed to Kafka
+ * partitions in a round-robin fashion.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A key-less serializable serialization
schema for turning user objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions.
+ * If a partitioner is not provided, records
will be distributed to Kafka partitions
+ */
+ public FlinkKafkaProducer10(
+ String topicId,
+ SerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+ super(topicId, serializationSchema, producerConfig,
customPartitioner);
+ }
+
+ /**
+ * Creates a FlinkKafka10Producer for a given topic. The sink produces
a DataStream to
+ * the topic.
+ *
+ * <p>Using this constructor, the default
+ * {@link
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner}
will be used as
+ * the partitioner. This default partitioner maps each sink subtask to
a single Kafka
+ * partition (i.e. all records received by a sink subtask will end up
in the same
+ * Kafka partition).
+ *
+ * <p>To use a custom partitioner, please use
+ * {@link #FlinkKafkaProducer10(String, KeyedSerializationSchema,
Properties, Optional)} instead.
+ *
+ * @param brokerList Comma separated addresses of the brokers
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema
+ */
+ public FlinkKafkaProducer10(
+ String brokerList,
+ String topicId,
+ KeyedSerializationSchema<IN> serializationSchema) {
+ super(brokerList, topicId, serializationSchema);
+ }
+
+ /**
+ * Creates a FlinkKafka10Producer for a given topic. The sink produces
a DataStream to
+ * the topic.
+ *
+ * <p>Using this constructor, the default
+ * {@link
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner}
will be used as
+ * the partitioner. This default partitioner maps each sink subtask to
a single Kafka
+ * partition (i.e. all records received by a sink subtask will end up
in the same
+ * Kafka partition).
+ *
+ * <p>To use a custom partitioner, please use
+ * {@link #FlinkKafkaProducer10(String, KeyedSerializationSchema,
Properties, Optional)} instead.
+ *
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined serialization schema
supporting key/value messages
+ * @param producerConfig
+ */
+ public FlinkKafkaProducer10(
+ String topicId,
+ KeyedSerializationSchema<IN> serializationSchema,
+ Properties producerConfig) {
+ super(topicId, serializationSchema, producerConfig);
+ }
+
+ /**
+ * Creates a FlinkKafka10Producer for a given topic. The sink produces
a DataStream to
+ * the topic.
+ *
+ * <p>Using this constructor, the default
+ * {@link
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner}
will be used as
+ * the partitioner. This default partitioner maps each sink subtask to
a single Kafka
+ * partition (i.e. all records received by a sink subtask will end up
in the same
+ * Kafka partition).
+ *
+ * <p>To use a custom partitioner, please use
+ * {@link #FlinkKafkaProducer10(String, KeyedSerializationSchema,
Properties, Optional, Semantic, int)} instead.
+ *
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined serialization schema
supporting key/value messages
+ * @param producerConfig Properties with the producer
configuration.
+ * @param semantic Defines semantic that will be used by
this producer (see {@link Semantic}).
+ */
+ public FlinkKafkaProducer10(
+ String topicId,
+ KeyedSerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ Semantic semantic) {
+ super(topicId, serializationSchema, producerConfig, semantic);
+ }
+
+ /**
+ * Creates a FlinkKafka10Producer for a given topic. The sink produces
its input to
+ * the topic. It accepts a keyed {@link KeyedSerializationSchema} and
possibly a custom {@link FlinkKafkaPartitioner}.
+ *
+ * <p>If a partitioner is not provided, written records will be
partitioned by the attached key of each
+ * record (as determined by {@link
KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+ * have a key (i.e., {@link
KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+ * will be distributed to Kafka partitions in a round-robin fashion.
+ *
+ * @param defaultTopicId The default topic to write data to
+ * @param serializationSchema A serializable serialization schema for
turning user objects into a kafka-consumable byte[] supporting key/value
messages
+ * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions.
+ * If a partitioner is not provided, records
will be partitioned by the key of each record
+ * (determined by {@link
KeyedSerializationSchema#serializeKey(Object)}). If the keys
+ * are {@code null}, then records will be
distributed to Kafka partitions in a
+ */
+ public FlinkKafkaProducer10(
+ String defaultTopicId,
+ KeyedSerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+ super(defaultTopicId, serializationSchema, producerConfig,
customPartitioner);
+ }
+
+ /**
+ * Creates a FlinkKafka10Producer for a given topic. The sink produces
its input to
+ * the topic. It accepts a keyed {@link KeyedSerializationSchema} and
possibly a custom {@link FlinkKafkaPartitioner}.
+ *
+ * <p>If a partitioner is not provided, written records will be
partitioned by the attached key of each
+ * record (as determined by {@link
KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+ * have a key (i.e., {@link
KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+ * will be distributed to Kafka partitions in a round-robin fashion.
+ *
+ * @param defaultTopicId The default topic to write data to
+ * @param serializationSchema A serializable serialization schema
for turning user objects into a kafka-consumable byte[] supporting key/value
messages
+ * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for
assigning messages to Kafka partitions.
+ * If a partitioner is not provided,
records will be partitioned by the key of each record
+ * (determined by {@link
KeyedSerializationSchema#serializeKey(Object)}). If the keys
+ * are {@code null}, then records will be
distributed to Kafka partitions in a
+ * round-robin fashion.
+ * @param semantic Defines semantic that will be used by
this producer (see {@link Semantic}).
+ * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool
size (see {@link Semantic#EXACTLY_ONCE}).
+ */
+ public FlinkKafkaProducer10(
+ String defaultTopicId,
+ KeyedSerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
+ Semantic semantic,
+ int kafkaProducersPoolSize) {
+ super(defaultTopicId, serializationSchema, producerConfig,
customPartitioner, semantic, kafkaProducersPoolSize);
+ }
+
+ @Override
+ protected FlinkKafka10Producer createProducer() {
+ return new FlinkKafka10Producer(this.producerConfig);
+ }
+
+}
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSink.java
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSink.java
new file mode 100644
index 00000000000..4f61b33bba1
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSink.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Kafka 1.0 table sink for writing data into Kafka.
+ */
+@Internal
+public class Kafka10TableSink extends KafkaTableSink {
+
+ public Kafka10TableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+
+ super(schema, topic, properties, partitioner,
serializationSchema);
+ }
+
+ @Override
+ protected SinkFunction<Row> createKafkaProducer(
+ String topic,
+ Properties properties,
+ SerializationSchema<Row> serializationSchema,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner) {
+ return new FlinkKafkaProducer10<>(
+ topic,
+ new
KeyedSerializationSchemaWrapper<>(serializationSchema),
+ properties,
+ partitioner);
+ }
+}
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSource.java
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSource.java
new file mode 100644
index 00000000000..2421ac764a5
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSource.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 1.0.
+ */
+@Internal
+public class Kafka10TableSource extends KafkaTableSource {
+
+ /**
+ * Creates a generic Kafka {@link StreamTableSource}.
+ *
+ * @param schema Schema of the produced table.
+ * @param proctimeAttribute Field name of the processing time
attribute.
+ * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+ * @param fieldMapping Mapping for the fields of the
table schema to
+ * fields of the physical returned
type.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema for
decoding records from Kafka.
+ * @param startupMode Startup mode for the contained
consumer.
+ * @param specificStartupOffsets Specific startup offsets; only
relevant when startup
+ * mode is {@link
StartupMode#SPECIFIC_OFFSETS}.
+ */
+ public Kafka10TableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Optional<Map<String, String>> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ super(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ fieldMapping,
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets);
+ }
+
+ /**
+ * Creates a generic Kafka {@link StreamTableSource}.
+ *
+ * @param schema Schema of the produced table.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema for decoding
records from Kafka.
+ */
+ public Kafka10TableSource(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema) {
+
+ super(schema, topic, properties, deserializationSchema);
+ }
+
+ @Override
+ protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema) {
+
+ return new FlinkKafkaConsumer10<Row>(topic,
deserializationSchema, properties);
+ }
+}
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactory.java
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactory.java
new file mode 100644
index 00000000000..5bfe479ccbe
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka10TableSource}.
+ */
+public class Kafka10TableSourceSinkFactory extends
KafkaTableSourceSinkFactoryBase {
+
+ @Override
+ protected String kafkaVersion() {
+ return KafkaValidator.CONNECTOR_VERSION_VALUE_10;
+ }
+
+ @Override
+ protected boolean supportsKafkaTimestamps() {
+ return true;
+ }
+
+ @Override
+ protected KafkaTableSource createKafkaTableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ return new Kafka10TableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ Optional.of(fieldMapping),
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets);
+ }
+
+ @Override
+ protected KafkaTableSink createKafkaTableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+
+ return new Kafka10TableSink(
+ schema,
+ topic,
+ properties,
+ partitioner,
+ serializationSchema);
+ }
+}
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafka10Producer.java
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafka10Producer.java
new file mode 100644
index 00000000000..5eca36e57a4
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafka10Producer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Inner flink kafka producer.
+ */
+@PublicEvolving
+public class FlinkKafka10Producer<K, V> extends FlinkKafkaProducer<K, V> {
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkKafka10Producer.class);
+
+ public FlinkKafka10Producer(Properties properties) {
+ super(properties);
+ }
+
+ /**
+ * Instead of obtaining producerId and epoch from the transaction
coordinator, re-use previously obtained ones,
+ * so that we can resume transaction after a restart. Implementation of
this method is based on
+ * {@link KafkaProducer#initTransactions}.
+ *
https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630
+ */
+ public void resumeTransaction(long producerId, short epoch) {
+ Preconditions.checkState(producerId >= 0 && epoch >= 0,
"Incorrect values for producerId {} and epoch {}", producerId, epoch);
+ LOG.info("Attempting to resume transaction {} with producerId
{} and epoch {}", transactionalId, producerId, epoch);
+
+ Object transactionManager = getValue(kafkaProducer,
"transactionManager");
+ synchronized (transactionManager) {
+ Object nextSequence = getValue(transactionManager,
"nextSequence");
+
+ invoke(transactionManager, "transitionTo",
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+ invoke(nextSequence, "clear");
+
+ Object producerIdAndEpoch =
getValue(transactionManager, "producerIdAndEpoch");
+ setValue(producerIdAndEpoch, "producerId", producerId);
+ setValue(producerIdAndEpoch, "epoch", epoch);
+
+ invoke(transactionManager, "transitionTo",
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+
+ invoke(transactionManager, "transitionTo",
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+ setValue(transactionManager, "transactionStarted",
true);
+ }
+ }
+
+}
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
b/flink-connectors/flink-connector-kafka-1.0/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 00000000000..b8951b9c3ed
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka10TableSourceSinkFactory
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/main/resources/log4j.properties
b/flink-connectors/flink-connector-kafka-1.0/src/main/resources/log4j.properties
new file mode 100644
index 00000000000..6eef1747ddf
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/main/resources/log4j.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka10ProducerITCase.java
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka10ProducerITCase.java
new file mode 100644
index 00000000000..cd137f7c7bc
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka10ProducerITCase.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafka10Producer;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for our own {@link FlinkKafka10Producer}.
+ */
+@SuppressWarnings("serial")
+public class FlinkKafka10ProducerITCase extends KafkaTestBase {
+ protected String transactionalId;
+ protected Properties extraProperties;
+
+ @Before
+ public void before() {
+ transactionalId = UUID.randomUUID().toString();
+ extraProperties = new Properties();
+ extraProperties.putAll(standardProps);
+ extraProperties.put("transactional.id", transactionalId);
+ extraProperties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ extraProperties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ extraProperties.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ extraProperties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ extraProperties.put("isolation.level", "read_committed");
+ }
+
+ @Test(timeout = 30000L)
+ public void testHappyPath() throws IOException {
+ String topicName = "flink-kafka-producer-happy-path";
+ try (Producer<String, String> kafkaProducer = new
FlinkKafka10Producer<>(extraProperties)) {
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+ kafkaProducer.send(new ProducerRecord<>(topicName,
"42", "42"));
+ kafkaProducer.commitTransaction();
+ }
+ assertRecord(topicName, "42", "42");
+ deleteTestTopic(topicName);
+ }
+
+ @Test(timeout = 30000L)
+ public void testResumeTransaction() throws IOException {
+ String topicName = "flink-kafka-producer-resume-transaction";
+ try (FlinkKafka10Producer<String, String> kafkaProducer = new
FlinkKafka10Producer<>(extraProperties)) {
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+ kafkaProducer.send(new ProducerRecord<>(topicName,
"42", "42"));
+ kafkaProducer.flush();
+ long producerId = kafkaProducer.getProducerId();
+ short epoch = kafkaProducer.getEpoch();
+
+ try (FlinkKafka10Producer<String, String>
resumeProducer = new FlinkKafka10Producer<>(extraProperties)) {
+ resumeProducer.resumeTransaction(producerId,
epoch);
+ resumeProducer.commitTransaction();
+ }
+
+ assertRecord(topicName, "42", "42");
+
+ // this shouldn't throw - in case of network split, old
producer might attempt to commit it's transaction
+ kafkaProducer.commitTransaction();
+
+ // this shouldn't fail also, for same reason as above
+ try (FlinkKafka10Producer<String, String>
resumeProducer = new FlinkKafka10Producer<>(extraProperties)) {
+ resumeProducer.resumeTransaction(producerId,
epoch);
+ resumeProducer.commitTransaction();
+ }
+ }
+ deleteTestTopic(topicName);
+ }
+
+ private void assertRecord(String topicName, String expectedKey, String
expectedValue) {
+ try (KafkaConsumer<String, String> kafkaConsumer = new
KafkaConsumer<>(extraProperties)) {
+
kafkaConsumer.subscribe(Collections.singletonList(topicName));
+ ConsumerRecords<String, String> records =
kafkaConsumer.poll(10000);
+
+ ConsumerRecord<String, String> record =
Iterables.getOnlyElement(records);
+ assertEquals(expectedKey, record.key());
+ assertEquals(expectedValue, record.value());
+ }
+ }
+}
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10ITCase.java
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10ITCase.java
new file mode 100644
index 00000000000..b178890773f
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10ITCase.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+
+import kafka.server.KafkaServer;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT cases for the {@link FlinkKafkaProducer10}.
+ */
+public class FlinkKafkaProducer10ITCase extends KafkaTestBase {
+
+ protected String transactionalId;
+ protected Properties extraProperties;
+
+ protected TypeInformationSerializationSchema<Integer>
integerSerializationSchema =
+ new
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new
ExecutionConfig());
+ protected KeyedSerializationSchema<Integer>
integerKeyedSerializationSchema =
+ new
KeyedSerializationSchemaWrapper<>(integerSerializationSchema);
+
+ @Before
+ public void before() {
+ transactionalId = UUID.randomUUID().toString();
+ extraProperties = new Properties();
+ extraProperties.putAll(standardProps);
+ extraProperties.put("transactional.id", transactionalId);
+ extraProperties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ extraProperties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ extraProperties.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ extraProperties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ extraProperties.put("isolation.level", "read_committed");
+ }
+
+ @Test
+ public void resourceCleanUpNone() throws Exception {
+ resourceCleanUp(FlinkKafkaProducer10.Semantic.NONE);
+ }
+
+ @Test
+ public void resourceCleanUpAtLeastOnce() throws Exception {
+ resourceCleanUp(FlinkKafkaProducer10.Semantic.AT_LEAST_ONCE);
+ }
+
+ /**
+ * This tests checks whether there is some resource leak in form of
growing threads number.
+ */
+ public void resourceCleanUp(FlinkKafkaProducer10.Semantic semantic)
throws Exception {
+ String topic = "flink-kafka-producer-resource-cleanup-" +
semantic;
+
+ final int allowedEpsilonThreadCountGrow = 50;
+
+ Optional<Integer> initialActiveThreads = Optional.empty();
+ for (int i = 0; i < allowedEpsilonThreadCountGrow * 2; i++) {
+ try (OneInputStreamOperatorTestHarness<Integer, Object>
testHarness1 =
+ createTestHarness(topic, 1, 1, 0, semantic)) {
+ testHarness1.setup();
+ testHarness1.open();
+ }
+
+ if (initialActiveThreads.isPresent()) {
+ assertThat("active threads count",
+ Thread.activeCount(),
+ lessThan(initialActiveThreads.get() +
allowedEpsilonThreadCountGrow));
+ }
+ else {
+ initialActiveThreads =
Optional.of(Thread.activeCount());
+ }
+ }
+ }
+
+ /**
+ * This test ensures that transactions reusing transactional.ids (after
returning to the pool) will not clash
+ * with previous transactions using same transactional.ids.
+ */
+ @Test
+ public void testRestoreToCheckpointAfterExceedingProducersPool() throws
Exception {
+ String topic = "flink-kafka-producer-fail-before-notify";
+
+ try (OneInputStreamOperatorTestHarness<Integer, Object>
testHarness1 = createTestHarness(topic)) {
+ testHarness1.setup();
+ testHarness1.open();
+ testHarness1.processElement(42, 0);
+ OperatorSubtaskState snapshot =
testHarness1.snapshot(0, 0);
+ testHarness1.processElement(43, 0);
+ testHarness1.notifyOfCompletedCheckpoint(0);
+ try {
+ for (int i = 0; i <
FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE; i++) {
+ testHarness1.snapshot(i + 1, 0);
+ testHarness1.processElement(i, 0);
+ }
+ throw new IllegalStateException("This should
not be reached.");
+ }
+ catch (Exception ex) {
+ if
(!isCausedBy(FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY, ex)) {
+ throw ex;
+ }
+ }
+
+ // Resume transactions before testHarness1 is being
closed (in case of failures close() might not be called)
+ try (OneInputStreamOperatorTestHarness<Integer, Object>
testHarness2 = createTestHarness(topic)) {
+ testHarness2.setup();
+ // restore from snapshot1, transactions with
records 43 and 44 should be aborted
+ testHarness2.initializeState(snapshot);
+ testHarness2.open();
+ }
+
+ assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42), 30_000L);
+ deleteTestTopic(topic);
+ }
+ catch (Exception ex) {
+ // testHarness1 will be fenced off after creating and
closing testHarness2
+ if (!findThrowable(ex,
ProducerFencedException.class).isPresent()) {
+ throw ex;
+ }
+ }
+ }
+
+ @Test
+ public void testFlinkKafkaProducer10FailBeforeNotify() throws Exception
{
+ String topic = "flink-kafka-producer-fail-before-notify";
+
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness
= createTestHarness(topic);
+
+ testHarness.setup();
+ testHarness.open();
+ testHarness.processElement(42, 0);
+ testHarness.snapshot(0, 1);
+ testHarness.processElement(43, 2);
+ OperatorSubtaskState snapshot = testHarness.snapshot(1, 3);
+
+ int leaderId = kafkaServer.getLeaderToShutDown(topic);
+ failBroker(leaderId);
+
+ try {
+ testHarness.processElement(44, 4);
+ testHarness.snapshot(2, 5);
+ assertFalse(true);
+ }
+ catch (Exception ex) {
+ // expected
+ }
+ try {
+ testHarness.close();
+ }
+ catch (Exception ex) {
+ }
+
+ kafkaServer.restartBroker(leaderId);
+
+ testHarness = createTestHarness(topic);
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.close();
+
+ assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42, 43), 30_000L);
+
+ deleteTestTopic(topic);
+ }
+
+ @Test
+ public void
testFlinkKafkaProducer10FailTransactionCoordinatorBeforeNotify() throws
Exception {
+ String topic =
"flink-kafka-producer-fail-transaction-coordinator-before-notify";
+
+ Properties properties = createProperties();
+
+ FlinkKafkaProducer10<Integer> kafkaProducer = new
FlinkKafkaProducer10<>(
+ topic,
+ integerKeyedSerializationSchema,
+ properties,
+ FlinkKafkaProducer10.Semantic.EXACTLY_ONCE);
+
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness1
= new OneInputStreamOperatorTestHarness<>(
+ new StreamSink<>(kafkaProducer),
+ IntSerializer.INSTANCE);
+
+ testHarness1.setup();
+ testHarness1.open();
+ testHarness1.processElement(42, 0);
+ testHarness1.snapshot(0, 1);
+ testHarness1.processElement(43, 2);
+ int transactionCoordinatorId =
kafkaProducer.getTransactionCoordinatorId();
+ OperatorSubtaskState snapshot = testHarness1.snapshot(1, 3);
+
+ failBroker(transactionCoordinatorId);
+
+ try {
+ testHarness1.processElement(44, 4);
+ testHarness1.notifyOfCompletedCheckpoint(1);
+ testHarness1.close();
+ }
+ catch (Exception ex) {
+ // Expected... some random exception could be thrown by
any of the above operations.
+ }
+ finally {
+ kafkaServer.restartBroker(transactionCoordinatorId);
+ }
+
+ try (OneInputStreamOperatorTestHarness<Integer, Object>
testHarness2 = createTestHarness(topic)) {
+ testHarness2.setup();
+ testHarness2.initializeState(snapshot);
+ testHarness2.open();
+ }
+
+ assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42, 43), 30_000L);
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * This tests checks whether FlinkKafkaProducer10 correctly aborts
lingering transactions after a failure.
+ * If such transactions were left alone lingering it consumers would be
unable to read committed records
+ * that were created after this lingering transaction.
+ */
+ @Test
+ public void testFailBeforeNotifyAndResumeWorkAfterwards() throws
Exception {
+ String topic = "flink-kafka-producer-fail-before-notify";
+
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness
= createTestHarness(topic);
+
+ testHarness.setup();
+ testHarness.open();
+ testHarness.processElement(42, 0);
+ testHarness.snapshot(0, 1);
+ testHarness.processElement(43, 2);
+ OperatorSubtaskState snapshot1 = testHarness.snapshot(1, 3);
+
+ testHarness.processElement(44, 4);
+ testHarness.snapshot(2, 5);
+ testHarness.processElement(45, 6);
+
+ // do not close previous testHarness to make sure that closing
do not clean up something (in case of failure
+ // there might not be any close)
+ testHarness = createTestHarness(topic);
+ testHarness.setup();
+ // restore from snapshot1, transactions with records 44 and 45
should be aborted
+ testHarness.initializeState(snapshot1);
+ testHarness.open();
+
+ // write and commit more records, after potentially lingering
transactions
+ testHarness.processElement(46, 7);
+ testHarness.snapshot(4, 8);
+ testHarness.processElement(47, 9);
+ testHarness.notifyOfCompletedCheckpoint(4);
+
+ //now we should have:
+ // - records 42 and 43 in committed transactions
+ // - aborted transactions with records 44 and 45
+ // - committed transaction with record 46
+ // - pending transaction with record 47
+ assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42, 43, 46), 30_000L);
+
+ testHarness.close();
+ deleteTestTopic(topic);
+ }
+
+ @Test
+ public void testFailAndRecoverSameCheckpointTwice() throws Exception {
+ String topic =
"flink-kafka-producer-fail-and-recover-same-checkpoint-twice";
+
+ OperatorSubtaskState snapshot1;
+ try (OneInputStreamOperatorTestHarness<Integer, Object>
testHarness = createTestHarness(topic)) {
+ testHarness.setup();
+ testHarness.open();
+ testHarness.processElement(42, 0);
+ testHarness.snapshot(0, 1);
+ testHarness.processElement(43, 2);
+ snapshot1 = testHarness.snapshot(1, 3);
+
+ testHarness.processElement(44, 4);
+ }
+
+ try (OneInputStreamOperatorTestHarness<Integer, Object>
testHarness = createTestHarness(topic)) {
+ testHarness.setup();
+ // restore from snapshot1, transactions with records 44
and 45 should be aborted
+ testHarness.initializeState(snapshot1);
+ testHarness.open();
+
+ // write and commit more records, after potentially
lingering transactions
+ testHarness.processElement(44, 7);
+ testHarness.snapshot(2, 8);
+ testHarness.processElement(45, 9);
+ }
+
+ try (OneInputStreamOperatorTestHarness<Integer, Object>
testHarness = createTestHarness(topic)) {
+ testHarness.setup();
+ // restore from snapshot1, transactions with records 44
and 45 should be aborted
+ testHarness.initializeState(snapshot1);
+ testHarness.open();
+
+ // write and commit more records, after potentially
lingering transactions
+ testHarness.processElement(44, 7);
+ testHarness.snapshot(3, 8);
+ testHarness.processElement(45, 9);
+ }
+
+ //now we should have:
+ // - records 42 and 43 in committed transactions
+ // - aborted transactions with records 44 and 45
+ assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42, 43), 30_000L);
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * This tests checks whether FlinkKafkaProducer10 correctly aborts
lingering transactions after a failure,
+ * which happened before first checkpoint and was followed up by
reducing the parallelism.
+ * If such transactions were left alone lingering it consumers would be
unable to read committed records
+ * that were created after this lingering transaction.
+ */
+ @Test
+ public void testScaleDownBeforeFirstCheckpoint() throws Exception {
+ String topic = "scale-down-before-first-checkpoint";
+
+ List<AutoCloseable> operatorsToClose = new ArrayList<>();
+ int preScaleDownParallelism = Math.max(2,
FlinkKafkaProducer10.SAFE_SCALE_DOWN_FACTOR);
+ for (int subtaskIndex = 0; subtaskIndex <
preScaleDownParallelism; subtaskIndex++) {
+ OneInputStreamOperatorTestHarness<Integer, Object>
preScaleDownOperator = createTestHarness(
+ topic,
+ preScaleDownParallelism,
+ preScaleDownParallelism,
+ subtaskIndex,
+ FlinkKafkaProducer10.Semantic.EXACTLY_ONCE);
+
+ preScaleDownOperator.setup();
+ preScaleDownOperator.open();
+ preScaleDownOperator.processElement(subtaskIndex * 2,
0);
+ preScaleDownOperator.snapshot(0, 1);
+ preScaleDownOperator.processElement(subtaskIndex * 2 +
1, 2);
+
+ operatorsToClose.add(preScaleDownOperator);
+ }
+
+ // do not close previous testHarnesses to make sure that
closing do not clean up something (in case of failure
+ // there might not be any close)
+
+ // After previous failure simulate restarting application with
smaller parallelism
+ OneInputStreamOperatorTestHarness<Integer, Object>
postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0,
FlinkKafkaProducer10.Semantic.EXACTLY_ONCE);
+
+ postScaleDownOperator1.setup();
+ postScaleDownOperator1.open();
+
+ // write and commit more records, after potentially lingering
transactions
+ postScaleDownOperator1.processElement(46, 7);
+ postScaleDownOperator1.snapshot(4, 8);
+ postScaleDownOperator1.processElement(47, 9);
+ postScaleDownOperator1.notifyOfCompletedCheckpoint(4);
+
+ //now we should have:
+ // - records 42, 43, 44 and 45 in aborted transactions
+ // - committed transaction with record 46
+ // - pending transaction with record 47
+ assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(46), 30_000L);
+
+ postScaleDownOperator1.close();
+ // ignore ProducerFencedExceptions, because
postScaleDownOperator1 could reuse transactional ids.
+ for (AutoCloseable operatorToClose : operatorsToClose) {
+ closeIgnoringProducerFenced(operatorToClose);
+ }
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Each instance of FlinkKafkaProducer10 uses it's own pool of
transactional ids. After the restore from checkpoint
+ * transactional ids are redistributed across the subtasks. In case of
scale down, the surplus transactional ids
+ * are dropped. In case of scale up, new one are generated (for the new
subtasks). This test make sure that sequence
+ * of scaling down and up again works fine. Especially it checks
whether the newly generated ids in scaling up
+ * do not overlap with ids that were used before scaling down. For
example we start with 4 ids and parallelism 4:
+ * [1], [2], [3], [4] - one assigned per each subtask
+ * we scale down to parallelism 2:
+ * [1, 2], [3, 4] - first subtask got id 1 and 2, second got ids 3 and 4
+ * surplus ids are dropped from the pools and we scale up to
parallelism 3:
+ * [1 or 2], [3 or 4], [???]
+ * new subtask have to generate new id(s), but he can not use ids that
are potentially in use, so it has to generate
+ * new ones that are greater then 4.
+ */
+ @Test
+ public void testScaleUpAfterScalingDown() throws Exception {
+ String topic = "scale-down-before-first-checkpoint";
+
+ final int parallelism1 = 4;
+ final int parallelism2 = 2;
+ final int parallelism3 = 3;
+ final int maxParallelism = Math.max(parallelism1,
Math.max(parallelism2, parallelism3));
+
+ List<OperatorStateHandle> operatorSubtaskState =
repartitionAndExecute(
+ topic,
+ Collections.emptyList(),
+ parallelism1,
+ maxParallelism,
+ IntStream.range(0, parallelism1).boxed().iterator());
+
+ operatorSubtaskState = repartitionAndExecute(
+ topic,
+ operatorSubtaskState,
+ parallelism2,
+ maxParallelism,
+ IntStream.range(parallelism1, parallelism1 +
parallelism2).boxed().iterator());
+
+ operatorSubtaskState = repartitionAndExecute(
+ topic,
+ operatorSubtaskState,
+ parallelism3,
+ maxParallelism,
+ IntStream.range(parallelism1 + parallelism2,
parallelism1 + parallelism2 + parallelism3).boxed().iterator());
+
+ // After each previous repartitionAndExecute call, we are left
with some lingering transactions, that would
+ // not allow us to read all committed messages from the topic.
Thus we initialize operators from
+ // OperatorSubtaskState once more, but without any new data.
This should terminate all ongoing transactions.
+
+ operatorSubtaskState = repartitionAndExecute(
+ topic,
+ operatorSubtaskState,
+ 1,
+ maxParallelism,
+ Collections.emptyIterator());
+
+ assertExactlyOnceForTopic(
+ createProperties(),
+ topic,
+ 0,
+ IntStream.range(0, parallelism1 + parallelism2 +
parallelism3).boxed().collect(Collectors.toList()),
+ 30_000L);
+ deleteTestTopic(topic);
+ }
+
+ private List<OperatorStateHandle> repartitionAndExecute(
+ String topic,
+ List<OperatorStateHandle> inputStates,
+ int parallelism,
+ int maxParallelism,
+ Iterator<Integer> inputData) throws Exception {
+
+ List<OperatorStateHandle> outputStates = new ArrayList<>();
+ List<OneInputStreamOperatorTestHarness<Integer, Object>>
testHarnesses = new ArrayList<>();
+
+ for (int subtaskIndex = 0; subtaskIndex < parallelism;
subtaskIndex++) {
+ OneInputStreamOperatorTestHarness<Integer, Object>
testHarness =
+ createTestHarness(topic, maxParallelism,
parallelism, subtaskIndex, FlinkKafkaProducer10.Semantic.EXACTLY_ONCE);
+ testHarnesses.add(testHarness);
+
+ testHarness.setup();
+
+ testHarness.initializeState(new OperatorSubtaskState(
+ new StateObjectCollection<>(inputStates),
+ StateObjectCollection.empty(),
+ StateObjectCollection.empty(),
+ StateObjectCollection.empty()));
+ testHarness.open();
+
+ if (inputData.hasNext()) {
+ int nextValue = inputData.next();
+ testHarness.processElement(nextValue, 0);
+ OperatorSubtaskState snapshot =
testHarness.snapshot(0, 0);
+
+
outputStates.addAll(snapshot.getManagedOperatorState());
+
checkState(snapshot.getRawOperatorState().isEmpty(), "Unexpected raw operator
state");
+
checkState(snapshot.getManagedKeyedState().isEmpty(), "Unexpected managed keyed
state");
+
checkState(snapshot.getRawKeyedState().isEmpty(), "Unexpected raw keyed state");
+
+ for (int i = 1; i <
FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) {
+ testHarness.processElement(-nextValue,
0);
+ testHarness.snapshot(i, 0);
+ }
+ }
+ }
+
+ for (OneInputStreamOperatorTestHarness<Integer, Object>
testHarness : testHarnesses) {
+ testHarness.close();
+ }
+
+ return outputStates;
+ }
+
+ @Test
+ public void testRecoverCommittedTransaction() throws Exception {
+ String topic =
"flink-kafka-producer-recover-committed-transaction";
+
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness
= createTestHarness(topic);
+
+ testHarness.setup();
+ testHarness.open(); // producerA - start transaction (txn) 0
+ testHarness.processElement(42, 0); // producerA - write 42 in
txn 0
+ OperatorSubtaskState checkpoint0 = testHarness.snapshot(0, 1);
// producerA - pre commit txn 0, producerB - start txn 1
+ testHarness.processElement(43, 2); // producerB - write 43 in
txn 1
+ testHarness.notifyOfCompletedCheckpoint(0); // producerA -
commit txn 0 and return to the pool
+ testHarness.snapshot(1, 3); // producerB - pre txn 1,
producerA - start txn 2
+ testHarness.processElement(44, 4); // producerA - write 44 in
txn 2
+ testHarness.close(); // producerA - abort txn 2
+
+ testHarness = createTestHarness(topic);
+ testHarness.initializeState(checkpoint0); // recover state 0 -
producerA recover and commit txn 0
+ testHarness.close();
+
+ assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42), 30_000L);
+
+ deleteTestTopic(topic);
+ }
+
+ @Test
+ public void testRunOutOfProducersInThePool() throws Exception {
+ String topic = "flink-kafka-run-out-of-producers";
+
+ try (OneInputStreamOperatorTestHarness<Integer, Object>
testHarness = createTestHarness(topic)) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ for (int i = 0; i <
FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE * 2; i++) {
+ testHarness.processElement(i, i * 2);
+ testHarness.snapshot(i, i * 2 + 1);
+ }
+ }
+ catch (Exception ex) {
+ if (!ex.getCause().getMessage().startsWith("Too many
ongoing")) {
+ throw ex;
+ }
+ }
+ deleteTestTopic(topic);
+ }
+
+ // shut down a Kafka broker
+ private void failBroker(int brokerId) {
+ KafkaServer toShutDown = null;
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+
+ if (kafkaServer.getBrokerId(server) == brokerId) {
+ toShutDown = server;
+ break;
+ }
+ }
+
+ if (toShutDown == null) {
+ StringBuilder listOfBrokers = new StringBuilder();
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+
listOfBrokers.append(kafkaServer.getBrokerId(server));
+ listOfBrokers.append(" ; ");
+ }
+
+ throw new IllegalArgumentException("Cannot find broker
to shut down: " + brokerId
+ + " ; available brokers: " +
listOfBrokers.toString());
+ } else {
+ toShutDown.shutdown();
+ toShutDown.awaitShutdown();
+ }
+ }
+
+ private void closeIgnoringProducerFenced(AutoCloseable autoCloseable)
throws Exception {
+ try {
+ autoCloseable.close();
+ }
+ catch (Exception ex) {
+ if (!(ex.getCause() instanceof
ProducerFencedException)) {
+ throw ex;
+ }
+ }
+ }
+
+ private OneInputStreamOperatorTestHarness<Integer, Object>
createTestHarness(String topic) throws Exception {
+ return createTestHarness(topic, 1, 1, 0,
FlinkKafkaProducer10.Semantic.EXACTLY_ONCE);
+ }
+
+ private OneInputStreamOperatorTestHarness<Integer, Object>
createTestHarness(
+ String topic,
+ int maxParallelism,
+ int parallelism,
+ int subtaskIndex,
+ FlinkKafkaProducer10.Semantic semantic) throws Exception {
+ Properties properties = createProperties();
+
+ FlinkKafkaProducer10<Integer> kafkaProducer = new
FlinkKafkaProducer10<>(
+ topic,
+ integerKeyedSerializationSchema,
+ properties,
+ semantic);
+
+ return new OneInputStreamOperatorTestHarness<>(
+ new StreamSink<>(kafkaProducer),
+ maxParallelism,
+ parallelism,
+ subtaskIndex,
+ IntSerializer.INSTANCE,
+ new OperatorID(42, 44));
+ }
+
+ private Properties createProperties() {
+ Properties properties = new Properties();
+ properties.putAll(standardProps);
+ properties.putAll(secureProps);
+ properties.put(FlinkKafkaProducer10.KEY_DISABLE_METRICS,
"true");
+ return properties;
+ }
+
+ private boolean isCausedBy(FlinkKafka011ErrorCode expectedErrorCode,
Throwable ex) {
+ Optional<FlinkKafka011Exception> cause = findThrowable(ex,
FlinkKafka011Exception.class);
+ if (cause.isPresent()) {
+ return
cause.get().getErrorCode().equals(expectedErrorCode);
+ }
+ return false;
+ }
+
+}
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10StateSerializerTest.java
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10StateSerializerTest.java
new file mode 100644
index 00000000000..47cc2ea972e
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10StateSerializerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.TransactionHolder;
+
+import java.util.Collections;
+import java.util.Optional;
+
+/**
+ * A test for the {@link TypeSerializer TypeSerializers} used for the Kafka
producer state.
+ */
+public class FlinkKafkaProducer10StateSerializerTest
+ extends SerializerTestBase<
+ TwoPhaseCommitSinkFunction.State<
+ FlinkKafkaProducer10.KafkaTransactionState,
+ FlinkKafkaProducer10.KafkaTransactionContext>> {
+
+ @Override
+ protected TypeSerializer<
+ TwoPhaseCommitSinkFunction.State<
+ FlinkKafkaProducer10.KafkaTransactionState,
+ FlinkKafkaProducer10.KafkaTransactionContext>>
createSerializer() {
+ return new TwoPhaseCommitSinkFunction.StateSerializer<>(
+ new FlinkKafkaProducer10.TransactionStateSerializer(),
+ new FlinkKafkaProducer10.ContextStateSerializer());
+ }
+
+ @Override
+ protected Class<TwoPhaseCommitSinkFunction.State<
+ FlinkKafkaProducer10.KafkaTransactionState,
+ FlinkKafkaProducer10.KafkaTransactionContext>>
getTypeClass() {
+ return (Class) TwoPhaseCommitSinkFunction.State.class;
+ }
+
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ @Override
+ protected TwoPhaseCommitSinkFunction.State<
+ FlinkKafkaProducer10.KafkaTransactionState,
+ FlinkKafkaProducer10.KafkaTransactionContext>[] getTestData() {
+ //noinspection unchecked
+ return new TwoPhaseCommitSinkFunction.State[] {
+ new TwoPhaseCommitSinkFunction.State<
+ FlinkKafkaProducer10.KafkaTransactionState,
+ FlinkKafkaProducer10.KafkaTransactionContext>(
+ new TransactionHolder(new
FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
+ Collections.emptyList(),
+ Optional.empty()),
+ new TwoPhaseCommitSinkFunction.State<
+ FlinkKafkaProducer10.KafkaTransactionState,
+ FlinkKafkaProducer10.KafkaTransactionContext>(
+ new TransactionHolder(new
FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 2711),
+ Collections.singletonList(new
TransactionHolder(new FlinkKafkaProducer10.KafkaTransactionState("fake", 1L,
(short) 42, null), 42)),
+ Optional.empty()),
+ new TwoPhaseCommitSinkFunction.State<
+ FlinkKafkaProducer10.KafkaTransactionState,
+ FlinkKafkaProducer10.KafkaTransactionContext>(
+ new TransactionHolder(new
FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
+ Collections.emptyList(),
+ Optional.of(new
FlinkKafkaProducer10.KafkaTransactionContext(Collections.emptySet()))),
+ new TwoPhaseCommitSinkFunction.State<
+ FlinkKafkaProducer10.KafkaTransactionState,
+ FlinkKafkaProducer10.KafkaTransactionContext>(
+ new TransactionHolder(new
FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
+ Collections.emptyList(),
+ Optional.of(new
FlinkKafkaProducer10.KafkaTransactionContext(Collections.singleton("hello")))),
+ new TwoPhaseCommitSinkFunction.State<
+ FlinkKafkaProducer10.KafkaTransactionState,
+ FlinkKafkaProducer10.KafkaTransactionContext>(
+ new TransactionHolder(new
FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
+ Collections.singletonList(new
TransactionHolder(new FlinkKafkaProducer10.KafkaTransactionState("fake", 1L,
(short) 42, null), 0)),
+ Optional.of(new
FlinkKafkaProducer10.KafkaTransactionContext(Collections.emptySet()))),
+ new TwoPhaseCommitSinkFunction.State<
+ FlinkKafkaProducer10.KafkaTransactionState,
+ FlinkKafkaProducer10.KafkaTransactionContext>(
+ new TransactionHolder(new
FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
+ Collections.singletonList(new
TransactionHolder(new FlinkKafkaProducer10.KafkaTransactionState("fake", 1L,
(short) 42, null), 0)),
+ Optional.of(new
FlinkKafkaProducer10.KafkaTransactionContext(Collections.singleton("hello"))))
+ };
+ }
+
+ @Override
+ public void testInstantiate() {
+ // this serializer does not support instantiation
+ }
+}
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ITCase.java
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ITCase.java
new file mode 100644
index 00000000000..d5521728061
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ITCase.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * IT cases for Kafka 1.0 .
+ */
+public class Kafka10ITCase extends KafkaConsumerTestBase {
+
+ @BeforeClass
+ public static void prepare() throws ClassNotFoundException {
+ KafkaProducerTestBase.prepare();
+ ((KafkaTestEnvironmentImpl)
kafkaServer).setProducerSemantic(FlinkKafkaProducer10.Semantic.AT_LEAST_ONCE);
+ }
+
+ //
------------------------------------------------------------------------
+ // Suite of Tests
+ //
------------------------------------------------------------------------
+
+ @Test(timeout = 60000)
+ public void testFailOnNoBroker() throws Exception {
+ runFailOnNoBrokerTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testConcurrentProducerConsumerTopology() throws Exception {
+ runSimpleConcurrentProducerConsumerTopology();
+ }
+
+ @Test(timeout = 60000)
+ public void testKeyValueSupport() throws Exception {
+ runKeyValueTest();
+ }
+
+ // --- canceling / failures ---
+
+ @Test(timeout = 60000)
+ public void testCancelingEmptyTopic() throws Exception {
+ runCancelingOnEmptyInputTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testCancelingFullTopic() throws Exception {
+ runCancelingOnFullInputTest();
+ }
+
+ // --- source to partition mappings and exactly once ---
+
+ @Test(timeout = 60000)
+ public void testOneToOneSources() throws Exception {
+ runOneToOneExactlyOnceTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testOneSourceMultiplePartitions() throws Exception {
+ runOneSourceMultiplePartitionsExactlyOnceTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleSourcesOnePartition() throws Exception {
+ runMultipleSourcesOnePartitionExactlyOnceTest();
+ }
+
+ // --- broker failure ---
+
+ @Test(timeout = 60000)
+ public void testBrokerFailure() throws Exception {
+ runBrokerFailureTest();
+ }
+
+ // --- special executions ---
+
+ @Test(timeout = 60000)
+ public void testBigRecordJob() throws Exception {
+ runBigRecordTestTopology();
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleTopics() throws Exception {
+ runProduceConsumeMultipleTopics();
+ }
+
+ @Test(timeout = 60000)
+ public void testAllDeletes() throws Exception {
+ runAllDeletesTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testMetricsAndEndOfStream() throws Exception {
+ runEndOfStreamTest();
+ }
+
+ // --- startup mode ---
+
+ @Test(timeout = 60000)
+ public void testStartFromEarliestOffsets() throws Exception {
+ runStartFromEarliestOffsets();
+ }
+
+ @Test(timeout = 60000)
+ public void testStartFromLatestOffsets() throws Exception {
+ runStartFromLatestOffsets();
+ }
+
+ @Test(timeout = 60000)
+ public void testStartFromGroupOffsets() throws Exception {
+ runStartFromGroupOffsets();
+ }
+
+ @Test(timeout = 60000)
+ public void testStartFromSpecificOffsets() throws Exception {
+ runStartFromSpecificOffsets();
+ }
+
+ @Test(timeout = 60000)
+ public void testStartFromTimestamp() throws Exception {
+ runStartFromTimestamp();
+ }
+
+ // --- offset committing ---
+
+ @Test(timeout = 60000)
+ public void testCommitOffsetsToKafka() throws Exception {
+ runCommitOffsetsToKafka();
+ }
+
+ @Test(timeout = 60000)
+ public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+ runAutoOffsetRetrievalAndCommitToKafka();
+ }
+
+ /**
+ * Kafka 10 specific test, ensuring Timestamps are properly written to
and read from Kafka.
+ */
+ @Test(timeout = 60000)
+ public void testTimestamps() throws Exception {
+
+ final String topic = "tstopic";
+ createTestTopic(topic, 3, 1);
+
+ // ---------- Produce an event time stream into Kafka
-------------------
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ DataStream<Long> streamWithTimestamps = env.addSource(new
SourceFunction<Long>() {
+ private static final long serialVersionUID =
-2255115836471289626L;
+ boolean running = true;
+
+ @Override
+ public void run(SourceContext<Long> ctx) throws
Exception {
+ long i = 0;
+ while (running) {
+ ctx.collectWithTimestamp(i, i * 2);
+ if (i++ == 1110L) {
+ running = false;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ final TypeInformationSerializationSchema<Long> longSer = new
TypeInformationSerializationSchema<>(Types.LONG, env.getConfig());
+ FlinkKafkaProducer10<Long> prod = new
FlinkKafkaProducer10<>(topic, new KeyedSerializationSchemaWrapper<>(longSer),
standardProps, Optional.of(new FlinkKafkaPartitioner<Long>() {
+ private static final long serialVersionUID =
-6730989584364230617L;
+
+ @Override
+ public int partition(Long next, byte[] key, byte[]
value, String targetTopic, int[] partitions) {
+ return (int) (next % 3);
+ }
+ }));
+ prod.setWriteTimestampToKafka(true);
+
+ streamWithTimestamps.addSink(prod).setParallelism(3);
+
+ env.execute("Produce some");
+
+ // ---------- Consume stream from Kafka -------------------
+
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ FlinkKafkaConsumer10<Long> kafkaSource = new
FlinkKafkaConsumer10<>(topic, new Kafka10ITCase.LimitedLongDeserializer(),
standardProps);
+ kafkaSource.assignTimestampsAndWatermarks(new
AssignerWithPunctuatedWatermarks<Long>() {
+ private static final long serialVersionUID =
-4834111173247835189L;
+
+ @Nullable
+ @Override
+ public Watermark checkAndGetNextWatermark(Long
lastElement, long extractedTimestamp) {
+ if (lastElement % 11 == 0) {
+ return new Watermark(lastElement);
+ }
+ return null;
+ }
+
+ @Override
+ public long extractTimestamp(Long element, long
previousElementTimestamp) {
+ return previousElementTimestamp;
+ }
+ });
+
+ DataStream<Long> stream = env.addSource(kafkaSource);
+ GenericTypeInfo<Object> objectTypeInfo = new
GenericTypeInfo<>(Object.class);
+ stream.transform("timestamp validating operator",
objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1);
+
+ env.execute("Consume again");
+
+ deleteTestTopic(topic);
+ }
+
+ private static class TimestampValidatingOperator extends
StreamSink<Long> {
+
+ private static final long serialVersionUID =
1353168781235526806L;
+
+ public TimestampValidatingOperator() {
+ super(new SinkFunction<Long>() {
+ private static final long serialVersionUID =
-6676565693361786524L;
+
+ @Override
+ public void invoke(Long value) throws Exception
{
+ throw new
RuntimeException("Unexpected");
+ }
+ });
+ }
+
+ long elCount = 0;
+ long wmCount = 0;
+ long lastWM = Long.MIN_VALUE;
+
+ @Override
+ public void processElement(StreamRecord<Long> element) throws
Exception {
+ elCount++;
+ if (element.getValue() * 2 != element.getTimestamp()) {
+ throw new RuntimeException("Invalid timestamp:
" + element);
+ }
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ wmCount++;
+
+ if (lastWM <= mark.getTimestamp()) {
+ lastWM = mark.getTimestamp();
+ } else {
+ throw new RuntimeException("Received watermark
higher than the last one");
+ }
+
+ if (mark.getTimestamp() % 11 != 0 &&
mark.getTimestamp() != Long.MAX_VALUE) {
+ throw new RuntimeException("Invalid watermark:
" + mark.getTimestamp());
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (elCount != 1110L) {
+ throw new RuntimeException("Wrong final element
count " + elCount);
+ }
+
+ if (wmCount <= 2) {
+ throw new RuntimeException("Almost no
watermarks have been sent " + wmCount);
+ }
+ }
+ }
+
+ private static class LimitedLongDeserializer implements
KeyedDeserializationSchema<Long> {
+
+ private static final long serialVersionUID =
6966177118923713521L;
+ private final TypeInformation<Long> ti;
+ private final TypeSerializer<Long> ser;
+ long cnt = 0;
+
+ public LimitedLongDeserializer() {
+ this.ti = Types.LONG;
+ this.ser = ti.createSerializer(new ExecutionConfig());
+ }
+
+ @Override
+ public TypeInformation<Long> getProducedType() {
+ return ti;
+ }
+
+ @Override
+ public Long deserialize(byte[] messageKey, byte[] message,
String topic, int partition, long offset) throws IOException {
+ cnt++;
+ DataInputView in = new DataInputViewStreamWrapper(new
ByteArrayInputStream(message));
+ Long e = ser.deserialize(in);
+ return e;
+ }
+
+ @Override
+ public boolean isEndOfStream(Long nextElement) {
+ return cnt > 1110L;
+ }
+ }
+
+}
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerAtLeastOnceITCase.java
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerAtLeastOnceITCase.java
new file mode 100644
index 00000000000..b064d9fa6b6
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerAtLeastOnceITCase.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.BeforeClass;
+
+/**
+ * IT cases for the {@link FlinkKafkaProducer10}.
+ */
+@SuppressWarnings("serial")
+public class Kafka10ProducerAtLeastOnceITCase extends KafkaProducerTestBase {
+
+ @BeforeClass
+ public static void prepare() throws ClassNotFoundException {
+ KafkaProducerTestBase.prepare();
+ ((KafkaTestEnvironmentImpl)
kafkaServer).setProducerSemantic(FlinkKafkaProducer10.Semantic.AT_LEAST_ONCE);
+ }
+
+ @Override
+ public void testExactlyOnceRegularSink() throws Exception {
+ // disable test for at least once semantic
+ }
+
+ @Override
+ public void testExactlyOnceCustomOperator() throws Exception {
+ // disable test for at least once semantic
+ }
+}
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerExactlyOnceITCase.java
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerExactlyOnceITCase.java
new file mode 100644
index 00000000000..8365e25e6f4
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerExactlyOnceITCase.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * IT cases for the {@link FlinkKafkaProducer10}.
+ */
+@SuppressWarnings("serial")
+public class Kafka10ProducerExactlyOnceITCase extends KafkaProducerTestBase {
+ @BeforeClass
+ public static void prepare() throws ClassNotFoundException {
+ KafkaProducerTestBase.prepare();
+ ((KafkaTestEnvironmentImpl)
kafkaServer).setProducerSemantic(FlinkKafkaProducer10.Semantic.EXACTLY_ONCE);
+ }
+
+ @Override
+ public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+ // TODO: fix this test
+ // currently very often (~50% cases) KafkaProducer live locks
itself on commitTransaction call.
+ // Somehow Kafka 10 doesn't play along with
NetworkFailureProxy. This can either mean a bug in Kafka
+ // that it doesn't work well with some weird network failures,
or the NetworkFailureProxy is a broken design
+ // and this test should be reimplemented in completely
different way...
+ }
+
+ @Override
+ public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+ // TODO: fix this test
+ // currently very often (~50% cases) KafkaProducer live locks
itself on commitTransaction call.
+ // Somehow Kafka 10 doesn't play along with
NetworkFailureProxy. This can either mean a bug in Kafka
+ // that it doesn't work well with some weird network failures,
or the NetworkFailureProxy is a broken design
+ // and this test should be reimplemented in completely
different way...
+ }
+
+ @Test
+ public void testMultipleSinkOperators() throws Exception {
+ testExactlyOnce(false, 2);
+ }
+}
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactoryTest.java
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactoryTest.java
new file mode 100644
index 00000000000..e557e519570
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactoryTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka10TableSource} and {@link Kafka10TableSink} created
+ * by {@link Kafka10TableSourceSinkFactory}.
+ */
+public class Kafka10TableSourceSinkFactoryTest extends
KafkaTableSourceSinkFactoryTestBase {
+
+ @Override
+ protected String getKafkaVersion() {
+ return KafkaValidator.CONNECTOR_VERSION_VALUE_10;
+ }
+
+ @Override
+ protected Class<FlinkKafkaConsumerBase<Row>>
getExpectedFlinkKafkaConsumer() {
+ return (Class) FlinkKafkaConsumer10.class;
+ }
+
+ @Override
+ protected Class<?> getExpectedFlinkKafkaProducer() {
+ return FlinkKafkaProducer10.class;
+ }
+
+ @Override
+ protected KafkaTableSource getExpectedKafkaTableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ return new Kafka10TableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ Optional.of(fieldMapping),
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets);
+ }
+
+ @Override
+ protected KafkaTableSink getExpectedKafkaTableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+
+ return new Kafka10TableSink(
+ schema,
+ topic,
+ properties,
+ partitioner,
+ serializationSchema);
+ }
+}
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
new file mode 100644
index 00000000000..02d5d0a3a81
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.networking.NetworkFailuresProxy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+
+import kafka.common.KafkaException;
+import kafka.metrics.KafkaMetricsReporter;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import org.apache.commons.collections.list.UnmodifiableList;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+
+import scala.collection.mutable.ArraySeq;
+
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * An implementation of the KafkaServerProvider for Kafka 1.0 .
+ */
+public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+ private File tmpZkDir;
+ private File tmpKafkaParent;
+ private List<File> tmpKafkaDirs;
+ private List<KafkaServer> brokers;
+ private TestingServer zookeeper;
+ private String zookeeperConnectionString;
+ private String brokerConnectionString = "";
+ private Properties standardProps;
+ private FlinkKafkaProducer10.Semantic producerSemantic =
FlinkKafkaProducer10.Semantic.EXACTLY_ONCE;
+ // 6 seconds is default. Seems to be too small for travis. 30 seconds
+ private int zkTimeout = 30000;
+ private Config config;
+
+ public void setProducerSemantic(FlinkKafkaProducer10.Semantic
producerSemantic) {
+ this.producerSemantic = producerSemantic;
+ }
+
+ @Override
+ public void prepare(Config config) {
+ //increase the timeout since in Travis ZK connection takes long
time for secure connection.
+ if (config.isSecureMode()) {
+ //run only one kafka server to avoid multiple ZK
connections from many instances - Travis timeout
+ config.setKafkaServersNumber(1);
+ zkTimeout = zkTimeout * 15;
+ }
+ this.config = config;
+
+ File tempDir = new File(System.getProperty("java.io.tmpdir"));
+ tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" +
(UUID.randomUUID().toString()));
+ assertTrue("cannot create zookeeper temp dir",
tmpZkDir.mkdirs());
+
+ tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" +
(UUID.randomUUID().toString()));
+ assertTrue("cannot create kafka temp dir",
tmpKafkaParent.mkdirs());
+
+ tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+ for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+ File tmpDir = new File(tmpKafkaParent, "server-" + i);
+ assertTrue("cannot create kafka temp dir",
tmpDir.mkdir());
+ tmpKafkaDirs.add(tmpDir);
+ }
+
+ zookeeper = null;
+ brokers = null;
+
+ try {
+ zookeeper = new TestingServer(-1, tmpZkDir);
+ zookeeperConnectionString =
zookeeper.getConnectString();
+ LOG.info("Starting Zookeeper with
zookeeperConnectionString: {}", zookeeperConnectionString);
+
+ LOG.info("Starting KafkaServer");
+ brokers = new
ArrayList<>(config.getKafkaServersNumber());
+
+ ListenerName listenerName =
ListenerName.forSecurityProtocol(config.isSecureMode() ?
SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
+ for (int i = 0; i < config.getKafkaServersNumber();
i++) {
+ KafkaServer kafkaServer = getKafkaServer(i,
tmpKafkaDirs.get(i));
+ brokers.add(kafkaServer);
+ brokerConnectionString +=
hostAndPortToUrlString(KAFKA_HOST,
kafkaServer.socketServer().boundPort(listenerName));
+ brokerConnectionString += ",";
+ }
+
+ LOG.info("ZK and KafkaServer started.");
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ fail("Test setup failed: " + t.getMessage());
+ }
+
+ standardProps = new Properties();
+ standardProps.setProperty("zookeeper.connect",
zookeeperConnectionString);
+ standardProps.setProperty("bootstrap.servers",
brokerConnectionString);
+ standardProps.setProperty("group.id", "flink-tests");
+ standardProps.setProperty("enable.auto.commit", "false");
+ standardProps.setProperty("zookeeper.session.timeout.ms",
String.valueOf(zkTimeout));
+ standardProps.setProperty("zookeeper.connection.timeout.ms",
String.valueOf(zkTimeout));
+ standardProps.setProperty("auto.offset.reset", "earliest"); //
read from the beginning. (earliest is kafka 1.0 value)
+ standardProps.setProperty("max.partition.fetch.bytes", "256");
// make a lot of fetches (MESSAGES MUST BE SMALLER!)
+ }
+
+ @Override
+ public void deleteTestTopic(String topic) {
+ LOG.info("Deleting topic {}", topic);
+ try (AdminClient adminClient =
AdminClient.create(getStandardProperties())) {
+
adminClient.deleteTopics(Collections.singleton(topic)).all().get();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Delete test topic : " + topic + " failed, " +
e.getMessage());
+ }
+ }
+
+ @Override
+ public void createTestTopic(String topic, int numberOfPartitions, int
replicationFactor, Properties properties) {
+ LOG.info("Creating topic {}", topic);
+ try (AdminClient adminClient =
AdminClient.create(getStandardProperties())) {
+ NewTopic topicObj = new NewTopic(topic,
numberOfPartitions, (short) replicationFactor);
+
adminClient.createTopics(Collections.singleton(topicObj)).all().get();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Create test topic : " + topic + " failed, " +
e.getMessage());
+ }
+ }
+
+ @Override
+ public Properties getStandardProperties() {
+ return standardProps;
+ }
+
+ @Override
+ public Properties getSecureProperties() {
+ Properties prop = new Properties();
+ if (config.isSecureMode()) {
+ prop.put("security.inter.broker.protocol",
"SASL_PLAINTEXT");
+ prop.put("security.protocol", "SASL_PLAINTEXT");
+ prop.put("sasl.kerberos.service.name", "kafka");
+
+ //add special timeout for Travis
+ prop.setProperty("zookeeper.session.timeout.ms",
String.valueOf(zkTimeout));
+ prop.setProperty("zookeeper.connection.timeout.ms",
String.valueOf(zkTimeout));
+ prop.setProperty("metadata.fetch.timeout.ms", "120000");
+ }
+ return prop;
+ }
+
+ @Override
+ public String getBrokerConnectionString() {
+ return brokerConnectionString;
+ }
+
+ @Override
+ public String getVersion() {
+ return "1.0";
+ }
+
+ @Override
+ public List<KafkaServer> getBrokers() {
+ return brokers;
+ }
+
+ @Override
+ public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics,
KeyedDeserializationSchema<T> readSchema, Properties props) {
+ return new FlinkKafkaConsumer10<T>(topics, readSchema, props);
+ }
+
+ @Override
+ public <K, V> Collection<ConsumerRecord<K, V>>
getAllRecordsFromTopic(Properties properties, String topic, int partition, long
timeout) {
+ List<ConsumerRecord<K, V>> result = new ArrayList<>();
+
+ try (KafkaConsumer<K, V> consumer = new
KafkaConsumer<>(properties)) {
+ consumer.assign(Arrays.asList(new TopicPartition(topic,
partition)));
+
+ while (true) {
+ boolean processedAtLeastOneRecord = false;
+
+ // wait for new records with timeout and break
the loop if we didn't get any
+ Iterator<ConsumerRecord<K, V>> iterator =
consumer.poll(timeout).iterator();
+ while (iterator.hasNext()) {
+ ConsumerRecord<K, V> record =
iterator.next();
+ result.add(record);
+ processedAtLeastOneRecord = true;
+ }
+
+ if (!processedAtLeastOneRecord) {
+ break;
+ }
+ }
+ consumer.commitSync();
+ }
+
+ return UnmodifiableList.decorate(result);
+ }
+
+ @Override
+ public <T> StreamSink<T> getProducerSink(String topic,
KeyedSerializationSchema<T> serSchema, Properties props,
FlinkKafkaPartitioner<T> partitioner) {
+ return new StreamSink<>(new FlinkKafkaProducer10<T>(
+ topic,
+ serSchema,
+ props,
+ Optional.ofNullable(partitioner),
+ producerSemantic,
+
FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
+ }
+
+ @Override
+ public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream,
String topic, KeyedSerializationSchema<T> serSchema, Properties props,
FlinkKafkaPartitioner<T> partitioner) {
+ return stream.addSink(new FlinkKafkaProducer10<T>(
+ topic,
+ serSchema,
+ props,
+ Optional.ofNullable(partitioner),
+ producerSemantic,
+
FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
+ }
+
+ @Override
+ public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T>
stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+ FlinkKafkaProducer10<T> prod = new FlinkKafkaProducer10<T>(
+ topic,
+ serSchema,
+ props,
+ Optional.of(new FlinkFixedPartitioner<>()),
+ producerSemantic,
+ FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+
+ prod.setWriteTimestampToKafka(true);
+
+ return stream.addSink(prod);
+ }
+
+ @Override
+ public KafkaOffsetHandler createOffsetHandler() {
+ return new KafkaOffsetHandlerImpl();
+ }
+
+ @Override
+ public void restartBroker(int leaderId) throws Exception {
+ brokers.set(leaderId, getKafkaServer(leaderId,
tmpKafkaDirs.get(leaderId)));
+ }
+
+ @Override
+ public int getLeaderToShutDown(String topic) throws Exception {
+ AdminClient client =
AdminClient.create(getStandardProperties());
+ TopicDescription result =
client.describeTopics(Collections.singleton(topic)).all().get().get(topic);
+ return result.partitions().get(0).leader().id();
+ }
+
+ @Override
+ public int getBrokerId(KafkaServer server) {
+ return server.config().brokerId();
+ }
+
+ @Override
+ public boolean isSecureRunSupported() {
+ return true;
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+ for (KafkaServer broker : brokers) {
+ if (broker != null) {
+ broker.shutdown();
+ }
+ }
+ brokers.clear();
+
+ if (zookeeper != null) {
+ try {
+ zookeeper.stop();
+ }
+ catch (Exception e) {
+ LOG.warn("ZK.stop() failed", e);
+ }
+ zookeeper = null;
+ }
+
+ // clean up the temp spaces
+
+ if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
+ try {
+ FileUtils.deleteDirectory(tmpKafkaParent);
+ }
+ catch (Exception e) {
+ // ignore
+ }
+ }
+ if (tmpZkDir != null && tmpZkDir.exists()) {
+ try {
+ FileUtils.deleteDirectory(tmpZkDir);
+ }
+ catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+
+ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder)
throws Exception {
+ Properties kafkaProperties = new Properties();
+
+ // properties have to be Strings
+ kafkaProperties.put("advertised.host.name", KAFKA_HOST);
+ kafkaProperties.put("broker.id", Integer.toString(brokerId));
+ kafkaProperties.put("log.dir", tmpFolder.toString());
+ kafkaProperties.put("zookeeper.connect",
zookeeperConnectionString);
+ kafkaProperties.put("message.max.bytes", String.valueOf(50 *
1024 * 1024));
+ kafkaProperties.put("replica.fetch.max.bytes",
String.valueOf(50 * 1024 * 1024));
+ kafkaProperties.put("transaction.max.timeout.ms",
Integer.toString(1000 * 60 * 60 * 2)); // 2hours
+
+ // for CI stability, increase zookeeper session timeout
+ kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
+ kafkaProperties.put("zookeeper.connection.timeout.ms",
zkTimeout);
+ if (config.getKafkaServerProperties() != null) {
+
kafkaProperties.putAll(config.getKafkaServerProperties());
+ }
+
+ final int numTries = 5;
+
+ for (int i = 1; i <= numTries; i++) {
+ int kafkaPort = NetUtils.getAvailablePort();
+ kafkaProperties.put("port",
Integer.toString(kafkaPort));
+
+ if (config.isHideKafkaBehindProxy()) {
+ NetworkFailuresProxy proxy =
createProxy(KAFKA_HOST, kafkaPort);
+ kafkaProperties.put("advertised.port",
proxy.getLocalPort());
+ }
+
+ //to support secure kafka cluster
+ if (config.isSecureMode()) {
+ LOG.info("Adding Kafka secure configurations");
+ kafkaProperties.put("listeners",
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+ kafkaProperties.put("advertised.listeners",
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+ kafkaProperties.putAll(getSecureProperties());
+ }
+
+ KafkaConfig kafkaConfig = new
KafkaConfig(kafkaProperties);
+
+ try {
+ scala.Option<String> stringNone =
scala.Option.apply(null);
+ KafkaServer server = new
KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, new
ArraySeq<KafkaMetricsReporter>(0));
+ server.startup();
+ return server;
+ }
+ catch (KafkaException e) {
+ if (e.getCause() instanceof BindException) {
+ // port conflict, retry...
+ LOG.info("Port conflict when starting
Kafka Broker. Retrying...");
+ }
+ else {
+ throw e;
+ }
+ }
+ }
+
+ throw new Exception("Could not start Kafka after " + numTries +
" retries due to port conflicts.");
+ }
+
+ private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
+
+ private final KafkaConsumer<byte[], byte[]> offsetClient;
+
+ public KafkaOffsetHandlerImpl() {
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+ offsetClient = new KafkaConsumer<>(props);
+ }
+
+ @Override
+ public Long getCommittedOffset(String topicName, int partition)
{
+ OffsetAndMetadata committed =
offsetClient.committed(new TopicPartition(topicName, partition));
+ return (committed != null) ? committed.offset() : null;
+ }
+
+ @Override
+ public void setCommittedOffset(String topicName, int partition,
long offset) {
+ Map<TopicPartition, OffsetAndMetadata>
partitionAndOffset = new HashMap<>();
+ partitionAndOffset.put(new TopicPartition(topicName,
partition), new OffsetAndMetadata(offset));
+ offsetClient.commitSync(partitionAndOffset);
+ }
+
+ @Override
+ public void close() {
+ offsetClient.close();
+ }
+ }
+}
diff --git
a/flink-connectors/flink-connector-kafka-1.0/src/test/resources/log4j-test.properties
b/flink-connectors/flink-connector-kafka-1.0/src/test/resources/log4j-test.properties
new file mode 100644
index 00000000000..fbeb110350f
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka-1.0/src/test/resources/log4j-test.properties
@@ -0,0 +1,30 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+log4j.logger.org.apache.zookeeper=OFF, testlogger
+log4j.logger.state.change.logger=OFF, testlogger
+log4j.logger.kafka=OFF, testlogger
diff --git
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
index cad37f8f8cd..ca308d13ea9 100644
---
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
+++
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
@@ -36,6 +36,7 @@
public static final String CONNECTOR_VERSION_VALUE_09 = "0.9";
public static final String CONNECTOR_VERSION_VALUE_010 = "0.10";
public static final String CONNECTOR_VERSION_VALUE_011 = "0.11";
+ public static final String CONNECTOR_VERSION_VALUE_10 = "1.0";
public static final String CONNECTOR_TOPIC = "connector.topic";
public static final String CONNECTOR_STARTUP_MODE =
"connector.startup-mode";
public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST =
"earliest-offset";
@@ -73,7 +74,8 @@ private void validateVersion(DescriptorProperties properties)
{
CONNECTOR_VERSION_VALUE_08,
CONNECTOR_VERSION_VALUE_09,
CONNECTOR_VERSION_VALUE_010,
- CONNECTOR_VERSION_VALUE_011);
+ CONNECTOR_VERSION_VALUE_011,
+ CONNECTOR_VERSION_VALUE_10);
properties.validateEnumValues(CONNECTOR_VERSION(), false,
versions);
properties.validateString(CONNECTOR_TOPIC, false, 1,
Integer.MAX_VALUE);
}
diff --git
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 7d88f0d94ea..c0df351c625 100644
---
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -184,7 +184,10 @@ public void runFailOnNoBrokerTest() throws Exception {
stream.print();
see.execute("No broker test");
} catch (JobExecutionException jee) {
- if (kafkaServer.getVersion().equals("0.9") ||
kafkaServer.getVersion().equals("0.10") ||
kafkaServer.getVersion().equals("0.11")) {
+ if (kafkaServer.getVersion().equals("0.9") ||
+ kafkaServer.getVersion().equals("0.10") ||
+ kafkaServer.getVersion().equals("0.11") ||
+ kafkaServer.getVersion().equals("1.0")) {
assertTrue(jee.getCause() instanceof
TimeoutException);
TimeoutException te = (TimeoutException)
jee.getCause();
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index cacea91578e..87a90683988 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -56,6 +56,7 @@ under the License.
<module>flink-connector-nifi</module>
<module>flink-connector-cassandra</module>
<module>flink-connector-filesystem</module>
+ <module>flink-connector-kafka-1.0</module>
</modules>
<!-- override these root dependencies as 'provided', so they don't end
up
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
index 211d374de58..8b69e750a9c 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
@@ -46,7 +46,7 @@ object ConnectorDescriptorValidator {
/**
* Key for describing the version of the connector. This property can be
used for different
- * connector versions (e.g. Kafka 0.8 or Kafka 0.11).
+ * connector versions (e.g. Kafka 0.8 or Kafka 1.0).
*/
val CONNECTOR_VERSION = "connector.version"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services