[FLINK-9295][kafka] Fix transactional.id collisions for FlinkKafkaProducer011
Previously if there were two completely independent FlinkKafkaProducer011 data sinks in the job graph, their transactional.id would collide with one another. Fix is to use operator's unique ID as well along task name and subtask id. This change is backward compatible for recovering from older savepoints, since transactional.ids generated by the old generator still will be used after restoring from state. This closes #5977. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d0cd584 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d0cd584 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d0cd584 Branch: refs/heads/release-1.5 Commit: 9d0cd5848d2eb620263dbb65c8ceb611fa440875 Parents: f04dfb5 Author: Piotr Nowojski <[email protected]> Authored: Tue May 8 17:49:31 2018 +0200 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue May 22 16:54:40 2018 +0800 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaProducer011.java | 2 +- .../kafka/FlinkKafkaProducer011ITCase.java | 4 +- .../Kafka011ProducerExactlyOnceITCase.java | 6 +++ .../connectors/kafka/KafkaProducerTestBase.java | 55 +++++++++++--------- .../util/AbstractStreamOperatorTestHarness.java | 31 +++++++++-- .../util/OneInputStreamOperatorTestHarness.java | 17 ++++-- 6 files changed, 81 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index 0ae5e03b..8497372 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -837,7 +837,7 @@ public class FlinkKafkaProducer011<IN> nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState( NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); transactionalIdsGenerator = new TransactionalIdsGenerator( - getRuntimeContext().getTaskName(), + getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), kafkaProducersPoolSize, http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java index 36cb362..74c58ad 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -627,7 +628,8 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { maxParallelism, parallelism, subtaskIndex, - IntSerializer.INSTANCE); + IntSerializer.INSTANCE, + new OperatorID(42, 44)); } private Properties createProperties() { http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java index 1167238..5038b7f 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.junit.BeforeClass; +import org.junit.Test; /** * IT cases for the {@link FlinkKafkaProducer011}. @@ -48,4 +49,9 @@ public class Kafka011ProducerExactlyOnceITCase extends KafkaProducerTestBase { // that it doesn't work well with some weird network failures, or the NetworkFailureProxy is a broken design // and this test should be reimplemented in completely different way... } + + @Test + public void testMultipleSinkOperators() throws Exception { + testExactlyOnce(false, 2); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java index 5023a7e..0807eb4 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -303,7 +303,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { */ @Test public void testExactlyOnceRegularSink() throws Exception { - testExactlyOnce(true); + testExactlyOnce(true, 1); } /** @@ -311,20 +311,22 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { */ @Test public void testExactlyOnceCustomOperator() throws Exception { - testExactlyOnce(false); + testExactlyOnce(false, 1); } /** * This test sets KafkaProducer so that it will automatically flush the data and * and fails the broker to check whether flushed records since last checkpoint were not duplicated. */ - protected void testExactlyOnce(boolean regularSink) throws Exception { - final String topic = regularSink ? "exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator"; + protected void testExactlyOnce(boolean regularSink, int sinksCount) throws Exception { + final String topic = (regularSink ? "exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator") + sinksCount; final int partition = 0; final int numElements = 1000; final int failAfterElements = 333; - createTestTopic(topic, 1, 1); + for (int i = 0; i < sinksCount; i++) { + createTestTopic(topic + i, 1, 1); + } TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema); @@ -346,32 +348,35 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { .addSource(new IntegerSource(numElements)) .map(new FailingIdentityMapper<Integer>(failAfterElements)); - FlinkKafkaPartitioner<Integer> partitioner = new FlinkKafkaPartitioner<Integer>() { - @Override - public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { - return partition; + for (int i = 0; i < sinksCount; i++) { + FlinkKafkaPartitioner<Integer> partitioner = new FlinkKafkaPartitioner<Integer>() { + @Override + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return partition; + } + }; + + if (regularSink) { + StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic + i, keyedSerializationSchema, properties, partitioner); + inputStream.addSink(kafkaSink.getUserFunction()); + } else { + kafkaServer.produceIntoKafka(inputStream, topic + i, keyedSerializationSchema, properties, partitioner); } - }; - if (regularSink) { - StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, partitioner); - inputStream.addSink(kafkaSink.getUserFunction()); - } - else { - kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, partitioner); } FailingIdentityMapper.failedBefore = false; TestUtils.tryExecute(env, "Exactly once test"); - // assert that before failure we successfully snapshot/flushed all expected elements - assertExactlyOnceForTopic( - properties, - topic, - partition, - expectedElements, - KAFKA_READ_TIMEOUT); - - deleteTestTopic(topic); + for (int i = 0; i < sinksCount; i++) { + // assert that before failure we successfully snapshot/flushed all expected elements + assertExactlyOnceForTopic( + properties, + topic + i, + partition, + expectedElements, + KAFKA_READ_TIMEOUT); + deleteTestTopic(topic + i); + } } private List<Integer> getIntegersSequence(int size) { http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 26ad3ab..0c4ecc0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -147,19 +147,42 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { .setParallelism(parallelism) .setSubtaskIndex(subtaskIndex) .build(), - true); + true, + new OperatorID()); + } + + public AbstractStreamOperatorTestHarness( + StreamOperator<OUT> operator, + int maxParallelism, + int parallelism, + int subtaskIndex, + OperatorID operatorID) throws Exception { + this( + operator, + new MockEnvironmentBuilder() + .setTaskName("MockTask") + .setMemorySize(3 * 1024 * 1024) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(1024) + .setMaxParallelism(maxParallelism) + .setParallelism(parallelism) + .setSubtaskIndex(subtaskIndex) + .build(), + true, + operatorID); } public AbstractStreamOperatorTestHarness( StreamOperator<OUT> operator, MockEnvironment env) throws Exception { - this(operator, env, false); + this(operator, env, false, new OperatorID()); } private AbstractStreamOperatorTestHarness( StreamOperator<OUT> operator, MockEnvironment env, - boolean environmentIsInternal) throws Exception { + boolean environmentIsInternal, + OperatorID operatorID) throws Exception { this.operator = operator; this.outputList = new ConcurrentLinkedQueue<>(); this.sideOutputLists = new HashMap<>(); @@ -167,7 +190,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { Configuration underlyingConfig = env.getTaskConfiguration(); this.config = new StreamConfig(underlyingConfig); this.config.setCheckpointingEnabled(true); - this.config.setOperatorID(new OperatorID()); + this.config.setOperatorID(operatorID); this.executionConfig = env.getExecutionConfig(); this.closableRegistry = new CloseableRegistry(); this.checkpointLock = new Object(); http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 66d2f69..0155198 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.util; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -54,8 +55,9 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> int maxParallelism, int parallelism, int subtaskIndex, - TypeSerializer<IN> typeSerializerIn) throws Exception { - this(operator, maxParallelism, parallelism, subtaskIndex); + TypeSerializer<IN> typeSerializerIn, + OperatorID operatorID) throws Exception { + this(operator, maxParallelism, parallelism, subtaskIndex, operatorID); config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn)); } @@ -78,7 +80,16 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> int maxParallelism, int parallelism, int subtaskIndex) throws Exception { - super(operator, maxParallelism, parallelism, subtaskIndex); + this(operator, maxParallelism, parallelism, subtaskIndex, new OperatorID()); + } + + public OneInputStreamOperatorTestHarness( + OneInputStreamOperator<IN, OUT> operator, + int maxParallelism, + int parallelism, + int subtaskIndex, + OperatorID operatorID) throws Exception { + super(operator, maxParallelism, parallelism, subtaskIndex, operatorID); this.oneInputOperator = operator; }
