Repository: flink Updated Branches: refs/heads/master 08bfdae68 -> 2c734508d
[FLINK-6988][kafka] Add Kafka 0.11 tests for scaling down and up again Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c734508 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c734508 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c734508 Branch: refs/heads/master Commit: 2c734508d7b6a034748e7d60f2f2075cddf156d8 Parents: 4ada50b Author: Piotr Nowojski <[email protected]> Authored: Fri Aug 25 09:47:12 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Oct 9 18:58:36 2017 +0200 ---------------------------------------------------------------------- .../kafka/FlinkKafkaProducer011Tests.java | 120 +++++++++++++++++++ 1 file changed, 120 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2c734508/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java index dd21bf4..69c3ceb 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; @@ -42,11 +43,17 @@ import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; +import java.util.List; import java.util.Properties; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import static org.apache.flink.util.Preconditions.checkState; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -316,6 +323,119 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase { deleteTestTopic(topic); } + /** + * Each instance of FlinkKafkaProducer011 uses it's own pool of transactional ids. After the restore from checkpoint + * transactional ids are redistributed across the subtasks. In case of scale down, the surplus transactional ids + * are dropped. In case of scale up, new one are generated (for the new subtasks). This test make sure that sequence + * of scaling down and up again works fine. Especially it checks whether the newly generated ids in scaling up + * do not overlap with ids that were used before scaling down. For example we start with 4 ids and parallelism 4: + * [1], [2], [3], [4] - one assigned per each subtask + * we scale down to parallelism 2: + * [1, 2], [3, 4] - first subtask got id 1 and 2, second got ids 3 and 4 + * surplus ids are dropped from the pools and we scale up to parallelism 3: + * [1 or 2], [3 or 4], [???] + * new subtask have to generate new id(s), but he can not use ids that are potentially in use, so it has to generate + * new ones that are greater then 4. + */ + @Test(timeout = 120_000L) + public void testScaleUpAfterScalingDown() throws Exception { + String topic = "scale-down-before-first-checkpoint"; + + final int parallelism1 = 4; + final int parallelism2 = 2; + final int parallelism3 = 3; + final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, parallelism3)); + + List<OperatorStateHandle> operatorStateHandles = repartitionAndExecute( + topic, + Collections.emptyList(), + parallelism1, + maxParallelism, + IntStream.range(0, parallelism1).boxed().iterator()); + + operatorStateHandles = repartitionAndExecute( + topic, + operatorStateHandles, + parallelism2, + maxParallelism, + IntStream.range(parallelism1, parallelism1 + parallelism2).boxed().iterator()); + + operatorStateHandles = repartitionAndExecute( + topic, + operatorStateHandles, + parallelism3, + maxParallelism, + IntStream.range(parallelism1 + parallelism2, parallelism1 + parallelism2 + parallelism3).boxed().iterator()); + + // After each previous repartitionAndExecute call, we are left with some lingering transactions, that would + // not allow us to read all committed messages from the topic. Thus we initialize operators from + // operatorStateHandles once more, but without any new data. This should terminate all ongoing transactions. + + operatorStateHandles = repartitionAndExecute( + topic, + operatorStateHandles, + 1, + maxParallelism, + Collections.emptyIterator()); + + assertExactlyOnceForTopic( + createProperties(), + topic, + 0, + IntStream.range(0, parallelism1 + parallelism2 + parallelism3).boxed().collect(Collectors.toList()), + 30_000L); + deleteTestTopic(topic); + } + + private List<OperatorStateHandle> repartitionAndExecute( + String topic, + List<OperatorStateHandle> inputStates, + int parallelism, + int maxParallelism, + Iterator<Integer> inputData) throws Exception { + + List<OperatorStateHandle> outputStates = new ArrayList<>(); + List<OneInputStreamOperatorTestHarness<Integer, Object>> testHarnesses = new ArrayList<>(); + + for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) { + OneInputStreamOperatorTestHarness<Integer, Object> testHarness = + createTestHarness(topic, maxParallelism, parallelism, subtaskIndex); + testHarnesses.add(testHarness); + + testHarness.setup(); + + testHarness.initializeState(new OperatorStateHandles( + 0, + Collections.emptyList(), + Collections.emptyList(), + inputStates, + Collections.emptyList())); + testHarness.open(); + + if (inputData.hasNext()) { + int nextValue = inputData.next(); + testHarness.processElement(nextValue, 0); + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + + outputStates.addAll(snapshot.getManagedOperatorState()); + checkState(snapshot.getRawOperatorState() == null, "Unexpected raw operator state"); + checkState(snapshot.getManagedKeyedState() == null, "Unexpected managed keyed state"); + checkState(snapshot.getRawKeyedState() == null, "Unexpected raw keyed state"); + + for (int i = 1; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) { + testHarness.processElement(-nextValue, 0); + testHarness.snapshot(i, 0); + } + } + } + + for (OneInputStreamOperatorTestHarness<Integer, Object> testHarness : testHarnesses) { + testHarness.close(); + } + + return outputStates; + } + @Test public void testRecoverCommittedTransaction() throws Exception { String topic = "flink-kafka-producer-recover-committed-transaction";
