Repository: flink
Updated Branches:
  refs/heads/master 72f56d1fb -> 8bcb2ae3c


http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index 9beed22..3bdfbed 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 
 import org.junit.Test;
@@ -25,6 +26,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Set;
 
@@ -45,12 +48,18 @@ public class KafkaConsumerPartitionAssignmentTest {
                                        new KafkaTopicPartition("test-topic", 
1));
 
                        for (int i = 0; i < inPartitions.size(); i++) {
-                               List<KafkaTopicPartition> parts = 
-                                               
FlinkKafkaConsumerBase.assignPartitions(inPartitions, inPartitions.size(), i);
-
-                               assertNotNull(parts);
-                               assertEquals(1, parts.size());
-                               assertTrue(contains(inPartitions, 
parts.get(0).getPartition()));
+                               Map<KafkaTopicPartition, Long> 
subscribedPartitionsToStartOffsets = new HashMap<>();
+                               
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+                                       subscribedPartitionsToStartOffsets,
+                                       inPartitions,
+                                       i,
+                                       inPartitions.size(),
+                                       StartupMode.GROUP_OFFSETS);
+
+                               List<KafkaTopicPartition> subscribedPartitions 
= new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
+
+                               assertEquals(1, subscribedPartitions.size());
+                               assertTrue(contains(inPartitions, 
subscribedPartitions.get(0).getPartition()));
                        }
                }
                catch (Exception e) {
@@ -59,15 +68,6 @@ public class KafkaConsumerPartitionAssignmentTest {
                }
        }
 
-       private boolean contains(List<KafkaTopicPartition> inPartitions, int 
partition) {
-               for (KafkaTopicPartition ktp : inPartitions) {
-                       if (ktp.getPartition() == partition) {
-                               return true;
-                       }
-               }
-               return false;
-       }
-
        @Test
        public void testMultiplePartitionsPerConsumers() {
                try {
@@ -87,14 +87,20 @@ public class KafkaConsumerPartitionAssignmentTest {
                        final int maxPartitionsPerConsumer = partitions.size() 
/ numConsumers + 1;
 
                        for (int i = 0; i < numConsumers; i++) {
-                               List<KafkaTopicPartition> parts = 
-                                               
FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i);
+                               Map<KafkaTopicPartition, Long> 
subscribedPartitionsToStartOffsets = new HashMap<>();
+                               
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+                                       subscribedPartitionsToStartOffsets,
+                                       partitions,
+                                       i,
+                                       numConsumers,
+                                       StartupMode.GROUP_OFFSETS);
+
+                               List<KafkaTopicPartition> subscribedPartitions 
= new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
 
-                               assertNotNull(parts);
-                               assertTrue(parts.size() >= 
minPartitionsPerConsumer);
-                               assertTrue(parts.size() <= 
maxPartitionsPerConsumer);
+                               assertTrue(subscribedPartitions.size() >= 
minPartitionsPerConsumer);
+                               assertTrue(subscribedPartitions.size() <= 
maxPartitionsPerConsumer);
 
-                               for (KafkaTopicPartition p : parts) {
+                               for (KafkaTopicPartition p : 
subscribedPartitions) {
                                        // check that the element was actually 
contained
                                        assertTrue(allPartitions.remove(p));
                                }
@@ -124,12 +130,19 @@ public class KafkaConsumerPartitionAssignmentTest {
                        final int numConsumers = 2 * inPartitions.size() + 3;
 
                        for (int i = 0; i < numConsumers; i++) {
-                               List<KafkaTopicPartition> parts = 
FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i);
+                               Map<KafkaTopicPartition, Long> 
subscribedPartitionsToStartOffsets = new HashMap<>();
+                               
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+                                       subscribedPartitionsToStartOffsets,
+                                       inPartitions,
+                                       i,
+                                       numConsumers,
+                                       StartupMode.GROUP_OFFSETS);
+
+                               List<KafkaTopicPartition> subscribedPartitions 
= new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
 
-                               assertNotNull(parts);
-                               assertTrue(parts.size() <= 1);
+                               assertTrue(subscribedPartitions.size() <= 1);
 
-                               for (KafkaTopicPartition p : parts) {
+                               for (KafkaTopicPartition p : 
subscribedPartitions) {
                                        // check that the element was actually 
contained
                                        assertTrue(allPartitions.remove(p));
                                }
@@ -148,13 +161,23 @@ public class KafkaConsumerPartitionAssignmentTest {
        public void testAssignEmptyPartitions() {
                try {
                        List<KafkaTopicPartition> ep = new ArrayList<>();
-                       List<KafkaTopicPartition> parts1 = 
FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2);
-                       assertNotNull(parts1);
-                       assertTrue(parts1.isEmpty());
-
-                       List<KafkaTopicPartition> parts2 = 
FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0);
-                       assertNotNull(parts2);
-                       assertTrue(parts2.isEmpty());
+                       Map<KafkaTopicPartition, Long> 
subscribedPartitionsToStartOffsets = new HashMap<>();
+                       
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+                               subscribedPartitionsToStartOffsets,
+                               ep,
+                               2,
+                               4,
+                               StartupMode.GROUP_OFFSETS);
+                       
assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty());
+
+                       subscribedPartitionsToStartOffsets = new HashMap<>();
+                       
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+                               subscribedPartitionsToStartOffsets,
+                               ep,
+                               0,
+                               1,
+                               StartupMode.GROUP_OFFSETS);
+                       
assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty());
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -184,33 +207,53 @@ public class KafkaConsumerPartitionAssignmentTest {
                        final int minNewPartitionsPerConsumer = 
newPartitions.size() / numConsumers;
                        final int maxNewPartitionsPerConsumer = 
newPartitions.size() / numConsumers + 1;
 
-                       List<KafkaTopicPartition> parts1 = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       initialPartitions, numConsumers, 0);
-                       List<KafkaTopicPartition> parts2 = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       initialPartitions, numConsumers, 1);
-                       List<KafkaTopicPartition> parts3 = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       initialPartitions, numConsumers, 2);
-
-                       assertNotNull(parts1);
-                       assertNotNull(parts2);
-                       assertNotNull(parts3);
-
-                       assertTrue(parts1.size() >= 
minInitialPartitionsPerConsumer);
-                       assertTrue(parts1.size() <= 
maxInitialPartitionsPerConsumer);
-                       assertTrue(parts2.size() >= 
minInitialPartitionsPerConsumer);
-                       assertTrue(parts2.size() <= 
maxInitialPartitionsPerConsumer);
-                       assertTrue(parts3.size() >= 
minInitialPartitionsPerConsumer);
-                       assertTrue(parts3.size() <= 
maxInitialPartitionsPerConsumer);
-
-                       for (KafkaTopicPartition p : parts1) {
+                       Map<KafkaTopicPartition, Long> 
subscribedPartitionsToStartOffsets1 = new HashMap<>();
+                       Map<KafkaTopicPartition, Long> 
subscribedPartitionsToStartOffsets2 = new HashMap<>();
+                       Map<KafkaTopicPartition, Long> 
subscribedPartitionsToStartOffsets3 = new HashMap<>();
+
+                       
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+                               subscribedPartitionsToStartOffsets1,
+                               initialPartitions,
+                               0,
+                               numConsumers,
+                               StartupMode.GROUP_OFFSETS);
+
+                       
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+                               subscribedPartitionsToStartOffsets2,
+                               initialPartitions,
+                               1,
+                               numConsumers,
+                               StartupMode.GROUP_OFFSETS);
+
+                       
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+                               subscribedPartitionsToStartOffsets3,
+                               initialPartitions,
+                               2,
+                               numConsumers,
+                               StartupMode.GROUP_OFFSETS);
+
+                       List<KafkaTopicPartition> subscribedPartitions1 = new 
ArrayList<>(subscribedPartitionsToStartOffsets1.keySet());
+                       List<KafkaTopicPartition> subscribedPartitions2 = new 
ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());
+                       List<KafkaTopicPartition> subscribedPartitions3 = new 
ArrayList<>(subscribedPartitionsToStartOffsets3.keySet());
+
+                       assertTrue(subscribedPartitions1.size() >= 
minInitialPartitionsPerConsumer);
+                       assertTrue(subscribedPartitions1.size() <= 
maxInitialPartitionsPerConsumer);
+                       assertTrue(subscribedPartitions2.size() >= 
minInitialPartitionsPerConsumer);
+                       assertTrue(subscribedPartitions2.size() <= 
maxInitialPartitionsPerConsumer);
+                       assertTrue(subscribedPartitions3.size() >= 
minInitialPartitionsPerConsumer);
+                       assertTrue(subscribedPartitions3.size() <= 
maxInitialPartitionsPerConsumer);
+
+                       for (KafkaTopicPartition p : subscribedPartitions1) {
                                // check that the element was actually contained
                                assertTrue(allInitialPartitions.remove(p));
                        }
-                       for (KafkaTopicPartition p : parts2) {
+
+                       for (KafkaTopicPartition p : subscribedPartitions2) {
                                // check that the element was actually contained
                                assertTrue(allInitialPartitions.remove(p));
                        }
-                       for (KafkaTopicPartition p : parts3) {
+
+                       for (KafkaTopicPartition p : subscribedPartitions3) {
                                // check that the element was actually contained
                                assertTrue(allInitialPartitions.remove(p));
                        }
@@ -220,39 +263,61 @@ public class KafkaConsumerPartitionAssignmentTest {
 
                        // grow the set of partitions and distribute anew
 
-                       List<KafkaTopicPartition> parts1new = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       newPartitions, numConsumers, 0);
-                       List<KafkaTopicPartition> parts2new = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       newPartitions, numConsumers, 1);
-                       List<KafkaTopicPartition> parts3new = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       newPartitions, numConsumers, 2);
+                       subscribedPartitionsToStartOffsets1 = new HashMap<>();
+                       subscribedPartitionsToStartOffsets2 = new HashMap<>();
+                       subscribedPartitionsToStartOffsets3 = new HashMap<>();
+
+                       
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+                               subscribedPartitionsToStartOffsets1,
+                               newPartitions,
+                               0,
+                               numConsumers,
+                               StartupMode.GROUP_OFFSETS);
+
+                       
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+                               subscribedPartitionsToStartOffsets2,
+                               newPartitions,
+                               1,
+                               numConsumers,
+                               StartupMode.GROUP_OFFSETS);
+
+                       
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+                               subscribedPartitionsToStartOffsets3,
+                               newPartitions,
+                               2,
+                               numConsumers,
+                               StartupMode.GROUP_OFFSETS);
+
+                       List<KafkaTopicPartition> subscribedPartitions1New = 
new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet());
+                       List<KafkaTopicPartition> subscribedPartitions2New = 
new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());
+                       List<KafkaTopicPartition> subscribedPartitions3New = 
new ArrayList<>(subscribedPartitionsToStartOffsets3.keySet());
 
                        // new partitions must include all old partitions
 
-                       assertTrue(parts1new.size() > parts1.size());
-                       assertTrue(parts2new.size() > parts2.size());
-                       assertTrue(parts3new.size() > parts3.size());
+                       assertTrue(subscribedPartitions1New.size() > 
subscribedPartitions1.size());
+                       assertTrue(subscribedPartitions2New.size() > 
subscribedPartitions2.size());
+                       assertTrue(subscribedPartitions3New.size() > 
subscribedPartitions3.size());
 
-                       assertTrue(parts1new.containsAll(parts1));
-                       assertTrue(parts2new.containsAll(parts2));
-                       assertTrue(parts3new.containsAll(parts3));
+                       
assertTrue(subscribedPartitions1New.containsAll(subscribedPartitions1));
+                       
assertTrue(subscribedPartitions2New.containsAll(subscribedPartitions2));
+                       
assertTrue(subscribedPartitions3New.containsAll(subscribedPartitions3));
 
-                       assertTrue(parts1new.size() >= 
minNewPartitionsPerConsumer);
-                       assertTrue(parts1new.size() <= 
maxNewPartitionsPerConsumer);
-                       assertTrue(parts2new.size() >= 
minNewPartitionsPerConsumer);
-                       assertTrue(parts2new.size() <= 
maxNewPartitionsPerConsumer);
-                       assertTrue(parts3new.size() >= 
minNewPartitionsPerConsumer);
-                       assertTrue(parts3new.size() <= 
maxNewPartitionsPerConsumer);
+                       assertTrue(subscribedPartitions1New.size() >= 
minNewPartitionsPerConsumer);
+                       assertTrue(subscribedPartitions1New.size() <= 
maxNewPartitionsPerConsumer);
+                       assertTrue(subscribedPartitions2New.size() >= 
minNewPartitionsPerConsumer);
+                       assertTrue(subscribedPartitions2New.size() <= 
maxNewPartitionsPerConsumer);
+                       assertTrue(subscribedPartitions3New.size() >= 
minNewPartitionsPerConsumer);
+                       assertTrue(subscribedPartitions3New.size() <= 
maxNewPartitionsPerConsumer);
 
-                       for (KafkaTopicPartition p : parts1new) {
+                       for (KafkaTopicPartition p : subscribedPartitions1New) {
                                // check that the element was actually contained
                                assertTrue(allNewPartitions.remove(p));
                        }
-                       for (KafkaTopicPartition p : parts2new) {
+                       for (KafkaTopicPartition p : subscribedPartitions2New) {
                                // check that the element was actually contained
                                assertTrue(allNewPartitions.remove(p));
                        }
-                       for (KafkaTopicPartition p : parts3new) {
+                       for (KafkaTopicPartition p : subscribedPartitions3New) {
                                // check that the element was actually contained
                                assertTrue(allNewPartitions.remove(p));
                        }
@@ -266,4 +331,13 @@ public class KafkaConsumerPartitionAssignmentTest {
                }
        }
 
+       private boolean contains(List<KafkaTopicPartition> inPartitions, int 
partition) {
+               for (KafkaTopicPartition ktp : inPartitions) {
+                       if (ktp.getPartition() == partition) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 6887518..9e9923d 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -44,10 +44,10 @@ public class AbstractFetcherTimestampsTest {
        @Test
        public void testPunctuatedWatermarks() throws Exception {
                final String testTopic = "test topic name";
-               List<KafkaTopicPartition> originalPartitions = Arrays.asList(
-                               new KafkaTopicPartition(testTopic, 7),
-                               new KafkaTopicPartition(testTopic, 13),
-                               new KafkaTopicPartition(testTopic, 21));
+               Map<KafkaTopicPartition, Long> originalPartitions = new 
HashMap<>();
+               originalPartitions.put(new KafkaTopicPartition(testTopic, 7), 
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+               originalPartitions.put(new KafkaTopicPartition(testTopic, 13), 
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+               originalPartitions.put(new KafkaTopicPartition(testTopic, 21), 
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
 
                TestSourceContext<Long> sourceContext = new 
TestSourceContext<>();
 
@@ -56,15 +56,14 @@ public class AbstractFetcherTimestampsTest {
                TestFetcher<Long> fetcher = new TestFetcher<>(
                                sourceContext,
                                originalPartitions,
-                               null,
                                null, /* periodic watermark assigner */
                                new 
SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new 
PunctuatedTestExtractor()),
                                processingTimeProvider,
                                0);
 
-               final KafkaTopicPartitionState<Object> part1 = 
fetcher.subscribedPartitions()[0];
-               final KafkaTopicPartitionState<Object> part2 = 
fetcher.subscribedPartitions()[1];
-               final KafkaTopicPartitionState<Object> part3 = 
fetcher.subscribedPartitions()[2];
+               final KafkaTopicPartitionState<Object> part1 = 
fetcher.subscribedPartitionStates()[0];
+               final KafkaTopicPartitionState<Object> part2 = 
fetcher.subscribedPartitionStates()[1];
+               final KafkaTopicPartitionState<Object> part3 = 
fetcher.subscribedPartitionStates()[2];
 
                // elements generate a watermark if the timestamp is a multiple 
of three
                
@@ -119,10 +118,10 @@ public class AbstractFetcherTimestampsTest {
        @Test
        public void testPeriodicWatermarks() throws Exception {
                final String testTopic = "test topic name";
-               List<KafkaTopicPartition> originalPartitions = Arrays.asList(
-                               new KafkaTopicPartition(testTopic, 7),
-                               new KafkaTopicPartition(testTopic, 13),
-                               new KafkaTopicPartition(testTopic, 21));
+               Map<KafkaTopicPartition, Long> originalPartitions = new 
HashMap<>();
+               originalPartitions.put(new KafkaTopicPartition(testTopic, 7), 
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+               originalPartitions.put(new KafkaTopicPartition(testTopic, 13), 
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+               originalPartitions.put(new KafkaTopicPartition(testTopic, 21), 
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
 
                TestSourceContext<Long> sourceContext = new 
TestSourceContext<>();
 
@@ -131,15 +130,14 @@ public class AbstractFetcherTimestampsTest {
                TestFetcher<Long> fetcher = new TestFetcher<>(
                                sourceContext,
                                originalPartitions,
-                               null,
                                new 
SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new 
PeriodicTestExtractor()),
                                null, /* punctuated watermarks assigner*/
                                processingTimeService,
                                10);
 
-               final KafkaTopicPartitionState<Object> part1 = 
fetcher.subscribedPartitions()[0];
-               final KafkaTopicPartitionState<Object> part2 = 
fetcher.subscribedPartitions()[1];
-               final KafkaTopicPartitionState<Object> part3 = 
fetcher.subscribedPartitions()[2];
+               final KafkaTopicPartitionState<Object> part1 = 
fetcher.subscribedPartitionStates()[0];
+               final KafkaTopicPartitionState<Object> part2 = 
fetcher.subscribedPartitionStates()[1];
+               final KafkaTopicPartitionState<Object> part3 = 
fetcher.subscribedPartitionStates()[2];
 
                // elements generate a watermark if the timestamp is a multiple 
of three
 
@@ -202,8 +200,7 @@ public class AbstractFetcherTimestampsTest {
 
                protected TestFetcher(
                                SourceContext<T> sourceContext,
-                               List<KafkaTopicPartition> assignedPartitions,
-                               HashMap<KafkaTopicPartition, Long> 
restoredSnapshotState,
+                               Map<KafkaTopicPartition, Long> 
assignedPartitionsWithStartOffsets,
                                
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
                                
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
                                ProcessingTimeService processingTimeProvider,
@@ -211,14 +208,12 @@ public class AbstractFetcherTimestampsTest {
                {
                        super(
                                sourceContext,
-                               assignedPartitions,
-                               restoredSnapshotState,
+                               assignedPartitionsWithStartOffsets,
                                watermarksPeriodic,
                                watermarksPunctuated,
                                processingTimeProvider,
                                autoWatermarkInterval,
                                TestFetcher.class.getClassLoader(),
-                               StartupMode.LATEST,
                                false);
                }
 

Reply via email to