[FLINK-9295][kafka] Fix transactional.id collisions for FlinkKafkaProducer011

Previously if there were two completely independent FlinkKafkaProducer011 data 
sinks
in the job graph, their transactional.id would collide with one another. Fix is 
to
use operator's unique ID as well along task name and subtask id.

This change is backward compatible for recovering from older savepoints,
since transactional.ids generated by the old generator still will be used
after restoring from state.

This closes #5977.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d0cd584
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d0cd584
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d0cd584

Branch: refs/heads/release-1.5
Commit: 9d0cd5848d2eb620263dbb65c8ceb611fa440875
Parents: f04dfb5
Author: Piotr Nowojski <[email protected]>
Authored: Tue May 8 17:49:31 2018 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue May 22 16:54:40 2018 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer011.java |  2 +-
 .../kafka/FlinkKafkaProducer011ITCase.java      |  4 +-
 .../Kafka011ProducerExactlyOnceITCase.java      |  6 +++
 .../connectors/kafka/KafkaProducerTestBase.java | 55 +++++++++++---------
 .../util/AbstractStreamOperatorTestHarness.java | 31 +++++++++--
 .../util/OneInputStreamOperatorTestHarness.java | 17 ++++--
 6 files changed, 81 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 0ae5e03b..8497372 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -837,7 +837,7 @@ public class FlinkKafkaProducer011<IN>
                nextTransactionalIdHintState = 
context.getOperatorStateStore().getUnionListState(
                        NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
                transactionalIdsGenerator = new TransactionalIdsGenerator(
-                       getRuntimeContext().getTaskName(),
+                       getRuntimeContext().getTaskName() + "-" + 
((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
                        getRuntimeContext().getIndexOfThisSubtask(),
                        getRuntimeContext().getNumberOfParallelSubtasks(),
                        kafkaProducersPoolSize,

http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 36cb362..74c58ad 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -627,7 +628,8 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                        maxParallelism,
                        parallelism,
                        subtaskIndex,
-                       IntSerializer.INSTANCE);
+                       IntSerializer.INSTANCE,
+                       new OperatorID(42, 44));
        }
 
        private Properties createProperties() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
index 1167238..5038b7f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.junit.BeforeClass;
+import org.junit.Test;
 
 /**
  * IT cases for the {@link FlinkKafkaProducer011}.
@@ -48,4 +49,9 @@ public class Kafka011ProducerExactlyOnceITCase extends 
KafkaProducerTestBase {
                // that it doesn't work well with some weird network failures, 
or the NetworkFailureProxy is a broken design
                // and this test should be reimplemented in completely 
different way...
        }
+
+       @Test
+       public void testMultipleSinkOperators() throws Exception {
+               testExactlyOnce(false, 2);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 5023a7e..0807eb4 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -303,7 +303,7 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
         */
        @Test
        public void testExactlyOnceRegularSink() throws Exception {
-               testExactlyOnce(true);
+               testExactlyOnce(true, 1);
        }
 
        /**
@@ -311,20 +311,22 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
         */
        @Test
        public void testExactlyOnceCustomOperator() throws Exception {
-               testExactlyOnce(false);
+               testExactlyOnce(false, 1);
        }
 
        /**
         * This test sets KafkaProducer so that it will  automatically flush 
the data and
         * and fails the broker to check whether flushed records since last 
checkpoint were not duplicated.
         */
-       protected void testExactlyOnce(boolean regularSink) throws Exception {
-               final String topic = regularSink ? 
"exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator";
+       protected void testExactlyOnce(boolean regularSink, int sinksCount) 
throws Exception {
+               final String topic = (regularSink ? 
"exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator") + sinksCount;
                final int partition = 0;
                final int numElements = 1000;
                final int failAfterElements = 333;
 
-               createTestTopic(topic, 1, 1);
+               for (int i = 0; i < sinksCount; i++) {
+                       createTestTopic(topic + i, 1, 1);
+               }
 
                TypeInformationSerializationSchema<Integer> schema = new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
                KeyedSerializationSchema<Integer> keyedSerializationSchema = 
new KeyedSerializationSchemaWrapper(schema);
@@ -346,32 +348,35 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                        .addSource(new IntegerSource(numElements))
                        .map(new 
FailingIdentityMapper<Integer>(failAfterElements));
 
-               FlinkKafkaPartitioner<Integer> partitioner = new 
FlinkKafkaPartitioner<Integer>() {
-                       @Override
-                       public int partition(Integer record, byte[] key, byte[] 
value, String targetTopic, int[] partitions) {
-                               return partition;
+               for (int i = 0; i < sinksCount; i++) {
+                       FlinkKafkaPartitioner<Integer> partitioner = new 
FlinkKafkaPartitioner<Integer>() {
+                               @Override
+                               public int partition(Integer record, byte[] 
key, byte[] value, String targetTopic, int[] partitions) {
+                                       return partition;
+                               }
+                       };
+
+                       if (regularSink) {
+                               StreamSink<Integer> kafkaSink = 
kafkaServer.getProducerSink(topic + i, keyedSerializationSchema, properties, 
partitioner);
+                               
inputStream.addSink(kafkaSink.getUserFunction());
+                       } else {
+                               kafkaServer.produceIntoKafka(inputStream, topic 
+ i, keyedSerializationSchema, properties, partitioner);
                        }
-               };
-               if (regularSink) {
-                       StreamSink<Integer> kafkaSink = 
kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, 
partitioner);
-                       inputStream.addSink(kafkaSink.getUserFunction());
-               }
-               else {
-                       kafkaServer.produceIntoKafka(inputStream, topic, 
keyedSerializationSchema, properties, partitioner);
                }
 
                FailingIdentityMapper.failedBefore = false;
                TestUtils.tryExecute(env, "Exactly once test");
 
-               // assert that before failure we successfully snapshot/flushed 
all expected elements
-               assertExactlyOnceForTopic(
-                       properties,
-                       topic,
-                       partition,
-                       expectedElements,
-                       KAFKA_READ_TIMEOUT);
-
-               deleteTestTopic(topic);
+               for (int i = 0; i < sinksCount; i++) {
+                       // assert that before failure we successfully 
snapshot/flushed all expected elements
+                       assertExactlyOnceForTopic(
+                               properties,
+                               topic + i,
+                               partition,
+                               expectedElements,
+                               KAFKA_READ_TIMEOUT);
+                       deleteTestTopic(topic + i);
+               }
        }
 
        private List<Integer> getIntegersSequence(int size) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 26ad3ab..0c4ecc0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -147,19 +147,42 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                                .setParallelism(parallelism)
                                .setSubtaskIndex(subtaskIndex)
                                .build(),
-                       true);
+                       true,
+                       new OperatorID());
+       }
+
+       public AbstractStreamOperatorTestHarness(
+                       StreamOperator<OUT> operator,
+                       int maxParallelism,
+                       int parallelism,
+                       int subtaskIndex,
+                       OperatorID operatorID) throws Exception {
+               this(
+                       operator,
+                       new MockEnvironmentBuilder()
+                               .setTaskName("MockTask")
+                               .setMemorySize(3 * 1024 * 1024)
+                               .setInputSplitProvider(new 
MockInputSplitProvider())
+                               .setBufferSize(1024)
+                               .setMaxParallelism(maxParallelism)
+                               .setParallelism(parallelism)
+                               .setSubtaskIndex(subtaskIndex)
+                               .build(),
+                       true,
+                       operatorID);
        }
 
        public AbstractStreamOperatorTestHarness(
                        StreamOperator<OUT> operator,
                        MockEnvironment env) throws Exception {
-               this(operator, env, false);
+               this(operator, env, false, new OperatorID());
        }
 
        private AbstractStreamOperatorTestHarness(
                        StreamOperator<OUT> operator,
                        MockEnvironment env,
-                       boolean environmentIsInternal) throws Exception {
+                       boolean environmentIsInternal,
+                       OperatorID operatorID) throws Exception {
                this.operator = operator;
                this.outputList = new ConcurrentLinkedQueue<>();
                this.sideOutputLists = new HashMap<>();
@@ -167,7 +190,7 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                Configuration underlyingConfig = env.getTaskConfiguration();
                this.config = new StreamConfig(underlyingConfig);
                this.config.setCheckpointingEnabled(true);
-               this.config.setOperatorID(new OperatorID());
+               this.config.setOperatorID(operatorID);
                this.executionConfig = env.getExecutionConfig();
                this.closableRegistry = new CloseableRegistry();
                this.checkpointLock = new Object();

http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 66d2f69..0155198 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -54,8 +55,9 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
                int maxParallelism,
                int parallelism,
                int subtaskIndex,
-               TypeSerializer<IN> typeSerializerIn) throws Exception {
-               this(operator, maxParallelism, parallelism, subtaskIndex);
+               TypeSerializer<IN> typeSerializerIn,
+               OperatorID operatorID) throws Exception {
+               this(operator, maxParallelism, parallelism, subtaskIndex, 
operatorID);
 
                
config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn));
        }
@@ -78,7 +80,16 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
                        int maxParallelism,
                        int parallelism,
                        int subtaskIndex) throws Exception {
-               super(operator, maxParallelism, parallelism, subtaskIndex);
+               this(operator, maxParallelism, parallelism, subtaskIndex, new 
OperatorID());
+       }
+
+       public OneInputStreamOperatorTestHarness(
+                       OneInputStreamOperator<IN, OUT> operator,
+                       int maxParallelism,
+                       int parallelism,
+                       int subtaskIndex,
+                       OperatorID operatorID) throws Exception {
+               super(operator, maxParallelism, parallelism, subtaskIndex, 
operatorID);
 
                this.oneInputOperator = operator;
        }

Reply via email to