Repository: flink
Updated Branches:
  refs/heads/master 08bfdae68 -> 2c734508d


[FLINK-6988][kafka] Add Kafka 0.11 tests for scaling down and up again


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c734508
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c734508
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c734508

Branch: refs/heads/master
Commit: 2c734508d7b6a034748e7d60f2f2075cddf156d8
Parents: 4ada50b
Author: Piotr Nowojski <[email protected]>
Authored: Fri Aug 25 09:47:12 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 .../kafka/FlinkKafkaProducer011Tests.java       | 120 +++++++++++++++++++
 1 file changed, 120 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2c734508/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
index dd21bf4..69c3ceb 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
@@ -42,11 +43,17 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
@@ -316,6 +323,119 @@ public class FlinkKafkaProducer011Tests extends 
KafkaTestBase {
                deleteTestTopic(topic);
        }
 
+       /**
+        * Each instance of FlinkKafkaProducer011 uses it's own pool of 
transactional ids. After the restore from checkpoint
+        * transactional ids are redistributed across the subtasks. In case of 
scale down, the surplus transactional ids
+        * are dropped. In case of scale up, new one are generated (for the new 
subtasks). This test make sure that sequence
+        * of scaling down and up again works fine. Especially it checks 
whether the newly generated ids in scaling up
+        * do not overlap with ids that were used before scaling down. For 
example we start with 4 ids and parallelism 4:
+        * [1], [2], [3], [4] - one assigned per each subtask
+        * we scale down to parallelism 2:
+        * [1, 2], [3, 4] - first subtask got id 1 and 2, second got ids 3 and 4
+        * surplus ids are dropped from the pools and we scale up to 
parallelism 3:
+        * [1 or 2], [3 or 4], [???]
+        * new subtask have to generate new id(s), but he can not use ids that 
are potentially in use, so it has to generate
+        * new ones that are greater then 4.
+        */
+       @Test(timeout = 120_000L)
+       public void testScaleUpAfterScalingDown() throws Exception {
+               String topic = "scale-down-before-first-checkpoint";
+
+               final int parallelism1 = 4;
+               final int parallelism2 = 2;
+               final int parallelism3 = 3;
+               final int maxParallelism = Math.max(parallelism1, 
Math.max(parallelism2, parallelism3));
+
+               List<OperatorStateHandle> operatorStateHandles = 
repartitionAndExecute(
+                       topic,
+                       Collections.emptyList(),
+                       parallelism1,
+                       maxParallelism,
+                       IntStream.range(0, parallelism1).boxed().iterator());
+
+               operatorStateHandles = repartitionAndExecute(
+                       topic,
+                       operatorStateHandles,
+                       parallelism2,
+                       maxParallelism,
+                       IntStream.range(parallelism1,  parallelism1 + 
parallelism2).boxed().iterator());
+
+               operatorStateHandles = repartitionAndExecute(
+                       topic,
+                       operatorStateHandles,
+                       parallelism3,
+                       maxParallelism,
+                       IntStream.range(parallelism1 + parallelism2,  
parallelism1 + parallelism2 + parallelism3).boxed().iterator());
+
+               // After each previous repartitionAndExecute call, we are left 
with some lingering transactions, that would
+               // not allow us to read all committed messages from the topic. 
Thus we initialize operators from
+               // operatorStateHandles once more, but without any new data. 
This should terminate all ongoing transactions.
+
+               operatorStateHandles = repartitionAndExecute(
+                       topic,
+                       operatorStateHandles,
+                       1,
+                       maxParallelism,
+                       Collections.emptyIterator());
+
+               assertExactlyOnceForTopic(
+                       createProperties(),
+                       topic,
+                       0,
+                       IntStream.range(0, parallelism1 + parallelism2 + 
parallelism3).boxed().collect(Collectors.toList()),
+                       30_000L);
+               deleteTestTopic(topic);
+       }
+
+       private List<OperatorStateHandle> repartitionAndExecute(
+                       String topic,
+                       List<OperatorStateHandle> inputStates,
+                       int parallelism,
+                       int maxParallelism,
+                       Iterator<Integer> inputData) throws Exception {
+
+               List<OperatorStateHandle> outputStates = new ArrayList<>();
+               List<OneInputStreamOperatorTestHarness<Integer, Object>> 
testHarnesses = new ArrayList<>();
+
+               for (int subtaskIndex = 0; subtaskIndex < parallelism; 
subtaskIndex++) {
+                       OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness =
+                               createTestHarness(topic, maxParallelism, 
parallelism, subtaskIndex);
+                       testHarnesses.add(testHarness);
+
+                       testHarness.setup();
+
+                       testHarness.initializeState(new OperatorStateHandles(
+                               0,
+                               Collections.emptyList(),
+                               Collections.emptyList(),
+                               inputStates,
+                               Collections.emptyList()));
+                       testHarness.open();
+
+                       if (inputData.hasNext()) {
+                               int nextValue = inputData.next();
+                               testHarness.processElement(nextValue, 0);
+                               OperatorStateHandles snapshot = 
testHarness.snapshot(0, 0);
+
+                               
outputStates.addAll(snapshot.getManagedOperatorState());
+                               checkState(snapshot.getRawOperatorState() == 
null, "Unexpected raw operator state");
+                               checkState(snapshot.getManagedKeyedState() == 
null, "Unexpected managed keyed state");
+                               checkState(snapshot.getRawKeyedState() == null, 
"Unexpected raw keyed state");
+
+                               for (int i = 1; i < 
FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) {
+                                       testHarness.processElement(-nextValue, 
0);
+                                       testHarness.snapshot(i, 0);
+                               }
+                       }
+               }
+
+               for (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness : testHarnesses) {
+                       testHarness.close();
+               }
+
+               return outputStates;
+       }
+
        @Test
        public void testRecoverCommittedTransaction() throws Exception {
                String topic = 
"flink-kafka-producer-recover-committed-transaction";

Reply via email to