Repository: flink Updated Branches: refs/heads/master 72f56d1fb -> 8bcb2ae3c
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java index 9beed22..3bdfbed 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.junit.Test; @@ -25,6 +26,8 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.Map; +import java.util.HashMap; import java.util.List; import java.util.Set; @@ -45,12 +48,18 @@ public class KafkaConsumerPartitionAssignmentTest { new KafkaTopicPartition("test-topic", 1)); for (int i = 0; i < inPartitions.size(); i++) { - List<KafkaTopicPartition> parts = - FlinkKafkaConsumerBase.assignPartitions(inPartitions, inPartitions.size(), i); - - assertNotNull(parts); - assertEquals(1, parts.size()); - assertTrue(contains(inPartitions, parts.get(0).getPartition())); + Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = new HashMap<>(); + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + subscribedPartitionsToStartOffsets, + inPartitions, + i, + inPartitions.size(), + StartupMode.GROUP_OFFSETS); + + List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet()); + + assertEquals(1, subscribedPartitions.size()); + assertTrue(contains(inPartitions, subscribedPartitions.get(0).getPartition())); } } catch (Exception e) { @@ -59,15 +68,6 @@ public class KafkaConsumerPartitionAssignmentTest { } } - private boolean contains(List<KafkaTopicPartition> inPartitions, int partition) { - for (KafkaTopicPartition ktp : inPartitions) { - if (ktp.getPartition() == partition) { - return true; - } - } - return false; - } - @Test public void testMultiplePartitionsPerConsumers() { try { @@ -87,14 +87,20 @@ public class KafkaConsumerPartitionAssignmentTest { final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1; for (int i = 0; i < numConsumers; i++) { - List<KafkaTopicPartition> parts = - FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i); + Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = new HashMap<>(); + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + subscribedPartitionsToStartOffsets, + partitions, + i, + numConsumers, + StartupMode.GROUP_OFFSETS); + + List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet()); - assertNotNull(parts); - assertTrue(parts.size() >= minPartitionsPerConsumer); - assertTrue(parts.size() <= maxPartitionsPerConsumer); + assertTrue(subscribedPartitions.size() >= minPartitionsPerConsumer); + assertTrue(subscribedPartitions.size() <= maxPartitionsPerConsumer); - for (KafkaTopicPartition p : parts) { + for (KafkaTopicPartition p : subscribedPartitions) { // check that the element was actually contained assertTrue(allPartitions.remove(p)); } @@ -124,12 +130,19 @@ public class KafkaConsumerPartitionAssignmentTest { final int numConsumers = 2 * inPartitions.size() + 3; for (int i = 0; i < numConsumers; i++) { - List<KafkaTopicPartition> parts = FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i); + Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = new HashMap<>(); + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + subscribedPartitionsToStartOffsets, + inPartitions, + i, + numConsumers, + StartupMode.GROUP_OFFSETS); + + List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet()); - assertNotNull(parts); - assertTrue(parts.size() <= 1); + assertTrue(subscribedPartitions.size() <= 1); - for (KafkaTopicPartition p : parts) { + for (KafkaTopicPartition p : subscribedPartitions) { // check that the element was actually contained assertTrue(allPartitions.remove(p)); } @@ -148,13 +161,23 @@ public class KafkaConsumerPartitionAssignmentTest { public void testAssignEmptyPartitions() { try { List<KafkaTopicPartition> ep = new ArrayList<>(); - List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2); - assertNotNull(parts1); - assertTrue(parts1.isEmpty()); - - List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0); - assertNotNull(parts2); - assertTrue(parts2.isEmpty()); + Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = new HashMap<>(); + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + subscribedPartitionsToStartOffsets, + ep, + 2, + 4, + StartupMode.GROUP_OFFSETS); + assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty()); + + subscribedPartitionsToStartOffsets = new HashMap<>(); + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + subscribedPartitionsToStartOffsets, + ep, + 0, + 1, + StartupMode.GROUP_OFFSETS); + assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty()); } catch (Exception e) { e.printStackTrace(); @@ -184,33 +207,53 @@ public class KafkaConsumerPartitionAssignmentTest { final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers; final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1; - List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions( - initialPartitions, numConsumers, 0); - List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions( - initialPartitions, numConsumers, 1); - List<KafkaTopicPartition> parts3 = FlinkKafkaConsumerBase.assignPartitions( - initialPartitions, numConsumers, 2); - - assertNotNull(parts1); - assertNotNull(parts2); - assertNotNull(parts3); - - assertTrue(parts1.size() >= minInitialPartitionsPerConsumer); - assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer); - assertTrue(parts2.size() >= minInitialPartitionsPerConsumer); - assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer); - assertTrue(parts3.size() >= minInitialPartitionsPerConsumer); - assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer); - - for (KafkaTopicPartition p : parts1) { + Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets1 = new HashMap<>(); + Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets2 = new HashMap<>(); + Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets3 = new HashMap<>(); + + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + subscribedPartitionsToStartOffsets1, + initialPartitions, + 0, + numConsumers, + StartupMode.GROUP_OFFSETS); + + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + subscribedPartitionsToStartOffsets2, + initialPartitions, + 1, + numConsumers, + StartupMode.GROUP_OFFSETS); + + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + subscribedPartitionsToStartOffsets3, + initialPartitions, + 2, + numConsumers, + StartupMode.GROUP_OFFSETS); + + List<KafkaTopicPartition> subscribedPartitions1 = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet()); + List<KafkaTopicPartition> subscribedPartitions2 = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet()); + List<KafkaTopicPartition> subscribedPartitions3 = new ArrayList<>(subscribedPartitionsToStartOffsets3.keySet()); + + assertTrue(subscribedPartitions1.size() >= minInitialPartitionsPerConsumer); + assertTrue(subscribedPartitions1.size() <= maxInitialPartitionsPerConsumer); + assertTrue(subscribedPartitions2.size() >= minInitialPartitionsPerConsumer); + assertTrue(subscribedPartitions2.size() <= maxInitialPartitionsPerConsumer); + assertTrue(subscribedPartitions3.size() >= minInitialPartitionsPerConsumer); + assertTrue(subscribedPartitions3.size() <= maxInitialPartitionsPerConsumer); + + for (KafkaTopicPartition p : subscribedPartitions1) { // check that the element was actually contained assertTrue(allInitialPartitions.remove(p)); } - for (KafkaTopicPartition p : parts2) { + + for (KafkaTopicPartition p : subscribedPartitions2) { // check that the element was actually contained assertTrue(allInitialPartitions.remove(p)); } - for (KafkaTopicPartition p : parts3) { + + for (KafkaTopicPartition p : subscribedPartitions3) { // check that the element was actually contained assertTrue(allInitialPartitions.remove(p)); } @@ -220,39 +263,61 @@ public class KafkaConsumerPartitionAssignmentTest { // grow the set of partitions and distribute anew - List<KafkaTopicPartition> parts1new = FlinkKafkaConsumerBase.assignPartitions( - newPartitions, numConsumers, 0); - List<KafkaTopicPartition> parts2new = FlinkKafkaConsumerBase.assignPartitions( - newPartitions, numConsumers, 1); - List<KafkaTopicPartition> parts3new = FlinkKafkaConsumerBase.assignPartitions( - newPartitions, numConsumers, 2); + subscribedPartitionsToStartOffsets1 = new HashMap<>(); + subscribedPartitionsToStartOffsets2 = new HashMap<>(); + subscribedPartitionsToStartOffsets3 = new HashMap<>(); + + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + subscribedPartitionsToStartOffsets1, + newPartitions, + 0, + numConsumers, + StartupMode.GROUP_OFFSETS); + + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + subscribedPartitionsToStartOffsets2, + newPartitions, + 1, + numConsumers, + StartupMode.GROUP_OFFSETS); + + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + subscribedPartitionsToStartOffsets3, + newPartitions, + 2, + numConsumers, + StartupMode.GROUP_OFFSETS); + + List<KafkaTopicPartition> subscribedPartitions1New = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet()); + List<KafkaTopicPartition> subscribedPartitions2New = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet()); + List<KafkaTopicPartition> subscribedPartitions3New = new ArrayList<>(subscribedPartitionsToStartOffsets3.keySet()); // new partitions must include all old partitions - assertTrue(parts1new.size() > parts1.size()); - assertTrue(parts2new.size() > parts2.size()); - assertTrue(parts3new.size() > parts3.size()); + assertTrue(subscribedPartitions1New.size() > subscribedPartitions1.size()); + assertTrue(subscribedPartitions2New.size() > subscribedPartitions2.size()); + assertTrue(subscribedPartitions3New.size() > subscribedPartitions3.size()); - assertTrue(parts1new.containsAll(parts1)); - assertTrue(parts2new.containsAll(parts2)); - assertTrue(parts3new.containsAll(parts3)); + assertTrue(subscribedPartitions1New.containsAll(subscribedPartitions1)); + assertTrue(subscribedPartitions2New.containsAll(subscribedPartitions2)); + assertTrue(subscribedPartitions3New.containsAll(subscribedPartitions3)); - assertTrue(parts1new.size() >= minNewPartitionsPerConsumer); - assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer); - assertTrue(parts2new.size() >= minNewPartitionsPerConsumer); - assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer); - assertTrue(parts3new.size() >= minNewPartitionsPerConsumer); - assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer); + assertTrue(subscribedPartitions1New.size() >= minNewPartitionsPerConsumer); + assertTrue(subscribedPartitions1New.size() <= maxNewPartitionsPerConsumer); + assertTrue(subscribedPartitions2New.size() >= minNewPartitionsPerConsumer); + assertTrue(subscribedPartitions2New.size() <= maxNewPartitionsPerConsumer); + assertTrue(subscribedPartitions3New.size() >= minNewPartitionsPerConsumer); + assertTrue(subscribedPartitions3New.size() <= maxNewPartitionsPerConsumer); - for (KafkaTopicPartition p : parts1new) { + for (KafkaTopicPartition p : subscribedPartitions1New) { // check that the element was actually contained assertTrue(allNewPartitions.remove(p)); } - for (KafkaTopicPartition p : parts2new) { + for (KafkaTopicPartition p : subscribedPartitions2New) { // check that the element was actually contained assertTrue(allNewPartitions.remove(p)); } - for (KafkaTopicPartition p : parts3new) { + for (KafkaTopicPartition p : subscribedPartitions3New) { // check that the element was actually contained assertTrue(allNewPartitions.remove(p)); } @@ -266,4 +331,13 @@ public class KafkaConsumerPartitionAssignmentTest { } } + private boolean contains(List<KafkaTopicPartition> inPartitions, int partition) { + for (KafkaTopicPartition ktp : inPartitions) { + if (ktp.getPartition() == partition) { + return true; + } + } + return false; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java index 6887518..9e9923d 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java @@ -44,10 +44,10 @@ public class AbstractFetcherTimestampsTest { @Test public void testPunctuatedWatermarks() throws Exception { final String testTopic = "test topic name"; - List<KafkaTopicPartition> originalPartitions = Arrays.asList( - new KafkaTopicPartition(testTopic, 7), - new KafkaTopicPartition(testTopic, 13), - new KafkaTopicPartition(testTopic, 21)); + Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>(); + originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); TestSourceContext<Long> sourceContext = new TestSourceContext<>(); @@ -56,15 +56,14 @@ public class AbstractFetcherTimestampsTest { TestFetcher<Long> fetcher = new TestFetcher<>( sourceContext, originalPartitions, - null, null, /* periodic watermark assigner */ new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()), processingTimeProvider, 0); - final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0]; - final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1]; - final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2]; + final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitionStates()[0]; + final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitionStates()[1]; + final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates()[2]; // elements generate a watermark if the timestamp is a multiple of three @@ -119,10 +118,10 @@ public class AbstractFetcherTimestampsTest { @Test public void testPeriodicWatermarks() throws Exception { final String testTopic = "test topic name"; - List<KafkaTopicPartition> originalPartitions = Arrays.asList( - new KafkaTopicPartition(testTopic, 7), - new KafkaTopicPartition(testTopic, 13), - new KafkaTopicPartition(testTopic, 21)); + Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>(); + originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); TestSourceContext<Long> sourceContext = new TestSourceContext<>(); @@ -131,15 +130,14 @@ public class AbstractFetcherTimestampsTest { TestFetcher<Long> fetcher = new TestFetcher<>( sourceContext, originalPartitions, - null, new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()), null, /* punctuated watermarks assigner*/ processingTimeService, 10); - final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0]; - final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1]; - final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2]; + final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitionStates()[0]; + final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitionStates()[1]; + final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates()[2]; // elements generate a watermark if the timestamp is a multiple of three @@ -202,8 +200,7 @@ public class AbstractFetcherTimestampsTest { protected TestFetcher( SourceContext<T> sourceContext, - List<KafkaTopicPartition> assignedPartitions, - HashMap<KafkaTopicPartition, Long> restoredSnapshotState, + Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, @@ -211,14 +208,12 @@ public class AbstractFetcherTimestampsTest { { super( sourceContext, - assignedPartitions, - restoredSnapshotState, + assignedPartitionsWithStartOffsets, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, TestFetcher.class.getClassLoader(), - StartupMode.LATEST, false); }
