[
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996822#comment-15996822
]
ASF GitHub Bot commented on FLINK-4022:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3746#discussion_r114788919
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
---
@@ -0,0 +1,842 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.MetricGroup;
+import
org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for the {@link KafkaConsumerThread}.
+ */
+public class KafkaConsumerThreadTest {
+
+ /**
+ * Tests reassignment works correctly in the case when:
+ * - the consumer initially had no assignments
+ * - new unassigned partitions already have defined offsets
+ *
+ * Setting a timeout because the test will not finish if there is logic
error with
+ * the reassignment flow.
+ */
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 10000)
+ public void
testReassigningPartitionsWithDefinedOffsetsWhenNoInitialAssignment() throws
Exception {
+ final String testTopic = "test-topic";
+
+ // -------- new partitions with defined offsets --------
+
+ KafkaTopicPartitionState<TopicPartition> newPartition1 = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 0), new
TopicPartition(testTopic, 0));
+ newPartition1.setOffset(23L);
+
+ KafkaTopicPartitionState<TopicPartition> newPartition2 = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 1), new
TopicPartition(testTopic, 1));
+ newPartition2.setOffset(31L);
+
+ final List<KafkaTopicPartitionState<TopicPartition>>
newPartitions = new ArrayList<>(2);
+ newPartitions.add(newPartition1);
+ newPartitions.add(newPartition2);
+
+ // -------- setup mock KafkaConsumer --------
+
+ // no initial assignment
+ final Map<TopicPartition, Long>
mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
+
+ final KafkaConsumer<byte[], byte[]> mockConsumer =
createMockConsumer(
+ mockConsumerAssignmentsAndPositions,
+ Collections.<TopicPartition, Long>emptyMap(),
+ false,
+ null,
+ null);
+
+ // -------- setup new partitions to be polled from the
unassigned partitions queue --------
+
+ final
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>>
unassignedPartitionsQueue =
+ new ClosableBlockingQueue<>();
+
+ for (KafkaTopicPartitionState<TopicPartition> newPartition :
newPartitions) {
+ unassignedPartitionsQueue.add(newPartition);
+ }
+
+ // -------- start test --------
+
+ final TestKafkaConsumerThread testThread =
+ new TestKafkaConsumerThread(mockConsumer,
unassignedPartitionsQueue, new Handover());
+ testThread.start();
+
+ testThread.startPartitionReassignment();
+ testThread.waitPartitionReassignmentComplete();
+
+ // verify that the consumer called assign() with all new
partitions, and that positions are correctly advanced
+
+ assertEquals(newPartitions.size(),
mockConsumerAssignmentsAndPositions.size());
+
+ for (KafkaTopicPartitionState<TopicPartition> newPartition :
newPartitions) {
+
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle()));
+
+ // should be seeked to (offset in state + 1) because
offsets in state represent the last processed record
+ assertEquals(
+ newPartition.getOffset() + 1,
+
mockConsumerAssignmentsAndPositions.get(newPartition.getKafkaPartitionHandle()).longValue());
+ }
+
+ assertEquals(0, unassignedPartitionsQueue.size());
+ }
+
+ /**
+ * Tests reassignment works correctly in the case when:
+ * - the consumer initially had no assignments
+ * - new unassigned partitions have undefined offsets (e.g.
EARLIEST_OFFSET sentinel value)
+ *
+ * Setting a timeout because the test will not finish if there is logic
error with
+ * the reassignment flow.
+ */
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 10000)
+ public void
testReassigningPartitionsWithoutDefinedOffsetsWhenNoInitialAssignment() throws
Exception {
+ final String testTopic = "test-topic";
+
+ // -------- new partitions with undefined offsets --------
+
+ KafkaTopicPartitionState<TopicPartition> newPartition1 = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 0), new
TopicPartition(testTopic, 0));
+
newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
+
+ KafkaTopicPartitionState<TopicPartition> newPartition2 = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 1), new
TopicPartition(testTopic, 1));
+
newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
+
+ final List<KafkaTopicPartitionState<TopicPartition>>
newPartitions = new ArrayList<>(2);
+ newPartitions.add(newPartition1);
+ newPartitions.add(newPartition2);
+
+ // -------- setup mock KafkaConsumer --------
+
+ // no initial assignment
+ final Map<TopicPartition, Long>
mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
+
+ // mock retrieved values that should replace the
EARLIEST_OFFSET sentinels
+ final Map<TopicPartition, Long> mockRetrievedPositions = new
HashMap<>();
+
mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L);
+
mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L);
+
+ final KafkaConsumer<byte[], byte[]> mockConsumer =
createMockConsumer(
+ mockConsumerAssignmentsAndPositions,
+ mockRetrievedPositions,
+ false,
+ null,
+ null);
+
+ // -------- setup new partitions to be polled from the
unassigned partitions queue --------
+
+ final
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>>
unassignedPartitionsQueue =
+ new ClosableBlockingQueue<>();
+
+ for (KafkaTopicPartitionState<TopicPartition> newPartition :
newPartitions) {
+ unassignedPartitionsQueue.add(newPartition);
+ }
+
+ // -------- start test --------
+
+ final TestKafkaConsumerThread testThread =
+ new TestKafkaConsumerThread(mockConsumer,
unassignedPartitionsQueue, new Handover());
+ testThread.start();
+
+ testThread.startPartitionReassignment();
+ testThread.waitPartitionReassignmentComplete();
+
+ // the sentinel offset states should have been replaced with
defined values according to the retrieved values
+
assertEquals(mockRetrievedPositions.get(newPartition1.getKafkaPartitionHandle())
- 1, newPartition1.getOffset());
+
assertEquals(mockRetrievedPositions.get(newPartition2.getKafkaPartitionHandle())
- 1, newPartition2.getOffset());
+
+ // verify that the consumer called assign() with all new
partitions, and that positions are correctly advanced
+
+ assertEquals(newPartitions.size(),
mockConsumerAssignmentsAndPositions.size());
+
+ for (KafkaTopicPartitionState<TopicPartition> newPartition :
newPartitions) {
+
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle()));
+
+ // should be seeked to (offset in state + 1) because
offsets in state represent the last processed record
+ assertEquals(
+ newPartition.getOffset() + 1,
+
mockConsumerAssignmentsAndPositions.get(newPartition.getKafkaPartitionHandle()).longValue());
+ }
+
+ assertEquals(0, unassignedPartitionsQueue.size());
+ }
+
+ /**
+ * Tests reassignment works correctly in the case when:
+ * - the consumer already have some assignments
+ * - new unassigned partitions already have defined offsets
+ *
+ * Setting a timeout because the test will not finish if there is logic
error with
+ * the reassignment flow.
+ */
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 10000)
+ public void testReassigningPartitionsWithDefinedOffsets() throws
Exception {
+ final String testTopic = "test-topic";
+
+ // -------- old partitions --------
+
+ KafkaTopicPartitionState<TopicPartition> oldPartition1 = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 0), new
TopicPartition(testTopic, 0));
+ oldPartition1.setOffset(23L);
+
+ KafkaTopicPartitionState<TopicPartition> oldPartition2 = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 1), new
TopicPartition(testTopic, 1));
+ oldPartition2.setOffset(32L);
+
+ List<KafkaTopicPartitionState<TopicPartition>> oldPartitions =
new ArrayList<>(2);
+ oldPartitions.add(oldPartition1);
+ oldPartitions.add(oldPartition2);
+
+ // -------- new partitions with defined offsets --------
+
+ KafkaTopicPartitionState<TopicPartition> newPartition = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 2), new
TopicPartition(testTopic, 2));
+ newPartition.setOffset(29L);
+
+ List<KafkaTopicPartitionState<TopicPartition>> totalPartitions
= new ArrayList<>(3);
+ totalPartitions.add(oldPartition1);
+ totalPartitions.add(oldPartition2);
+ totalPartitions.add(newPartition);
+
+ // -------- setup mock KafkaConsumer --------
+
+ // has initial assignments
+ final Map<TopicPartition, Long>
mockConsumerAssignmentsAndPositions = new HashMap<>();
+ for (KafkaTopicPartitionState<TopicPartition> oldPartition :
oldPartitions) {
+
mockConsumerAssignmentsAndPositions.put(oldPartition.getKafkaPartitionHandle(),
oldPartition.getOffset() + 1);
+ }
+
+ final KafkaConsumer<byte[], byte[]> mockConsumer =
createMockConsumer(
+ mockConsumerAssignmentsAndPositions,
+ Collections.<TopicPartition, Long>emptyMap(),
+ false,
+ null,
+ null);
+
+ // -------- setup new partitions to be polled from the
unassigned partitions queue --------
+
+ final
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>>
unassignedPartitionsQueue =
+ new ClosableBlockingQueue<>();
+
+ unassignedPartitionsQueue.add(newPartition);
+
+ // -------- start test --------
+
+ final TestKafkaConsumerThread testThread =
+ new TestKafkaConsumerThread(mockConsumer,
unassignedPartitionsQueue, new Handover());
+ testThread.start();
+
+ testThread.startPartitionReassignment();
+ testThread.waitPartitionReassignmentComplete();
+
+ // verify that the consumer called assign() with all new
partitions, and that positions are correctly advanced
+
+ assertEquals(totalPartitions.size(),
mockConsumerAssignmentsAndPositions.size());
+
+ // old partitions should be re-seeked to their previous
positions
+ for (KafkaTopicPartitionState<TopicPartition> partition :
totalPartitions) {
+
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(partition.getKafkaPartitionHandle()));
+
+ // should be seeked to (offset in state + 1) because
offsets in state represent the last processed record
+ assertEquals(
+ partition.getOffset() + 1,
+
mockConsumerAssignmentsAndPositions.get(partition.getKafkaPartitionHandle()).longValue());
+ }
+
+ assertEquals(0, unassignedPartitionsQueue.size());
+ }
+
+ /**
+ * Tests reassignment works correctly in the case when:
+ * - the consumer already have some assignments
+ * - new unassigned partitions have undefined offsets (e.g.
EARLIEST_OFFSET sentinel value)
+ *
+ * Setting a timeout because the test will not finish if there is logic
error with
+ * the reassignment flow.
+ */
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 10000)
+ public void testReassigningPartitionsWithoutDefinedOffsets() throws
Exception {
+ final String testTopic = "test-topic";
+
+ // -------- old partitions --------
+
+ KafkaTopicPartitionState<TopicPartition> oldPartition1 = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 0), new
TopicPartition(testTopic, 0));
+ oldPartition1.setOffset(23L);
+
+ KafkaTopicPartitionState<TopicPartition> oldPartition2 = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 1), new
TopicPartition(testTopic, 1));
+ oldPartition2.setOffset(32L);
+
+ List<KafkaTopicPartitionState<TopicPartition>> oldPartitions =
new ArrayList<>(2);
+ oldPartitions.add(oldPartition1);
+ oldPartitions.add(oldPartition2);
+
+ // -------- new partitions with undefined offsets --------
+
+ KafkaTopicPartitionState<TopicPartition> newPartition = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 2), new
TopicPartition(testTopic, 2));
+
newPartition.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
+
+ List<KafkaTopicPartitionState<TopicPartition>> totalPartitions
= new ArrayList<>(3);
+ totalPartitions.add(oldPartition1);
+ totalPartitions.add(oldPartition2);
+ totalPartitions.add(newPartition);
+
+ // -------- setup mock KafkaConsumer --------
+
+ // has initial assignments
+ final Map<TopicPartition, Long>
mockConsumerAssignmentsAndPositions = new HashMap<>();
+ for (KafkaTopicPartitionState<TopicPartition> oldPartition :
oldPartitions) {
+
mockConsumerAssignmentsAndPositions.put(oldPartition.getKafkaPartitionHandle(),
oldPartition.getOffset() + 1);
+ }
+
+ // mock retrieved values that should replace the
EARLIEST_OFFSET sentinels
+ final Map<TopicPartition, Long> mockRetrievedPositions = new
HashMap<>();
+
mockRetrievedPositions.put(newPartition.getKafkaPartitionHandle(), 30L);
+
+ final KafkaConsumer<byte[], byte[]> mockConsumer =
createMockConsumer(
+ mockConsumerAssignmentsAndPositions,
+ mockRetrievedPositions,
+ false,
+ null,
+ null);
+
+ // -------- setup new partitions to be polled from the
unassigned partitions queue --------
+
+ final
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>>
unassignedPartitionsQueue =
+ new ClosableBlockingQueue<>();
+
+ unassignedPartitionsQueue.add(newPartition);
+
+ // -------- start test --------
+
+ final TestKafkaConsumerThread testThread =
+ new TestKafkaConsumerThread(mockConsumer,
unassignedPartitionsQueue, new Handover());
+ testThread.start();
+
+ testThread.startPartitionReassignment();
+ testThread.waitPartitionReassignmentComplete();
+
+ // the sentinel offset states should have been replaced with
defined values according to the retrieved positions
+
assertEquals(mockRetrievedPositions.get(newPartition.getKafkaPartitionHandle())
- 1, newPartition.getOffset());
+
+ // verify that the consumer called assign() with all new
partitions, and that positions are correctly advanced
+
+ assertEquals(totalPartitions.size(),
mockConsumerAssignmentsAndPositions.size());
+
+ // old partitions should be re-seeked to their previous
positions
+ for (KafkaTopicPartitionState<TopicPartition> partition :
totalPartitions) {
+
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(partition.getKafkaPartitionHandle()));
+
+ // should be seeked to (offset in state + 1) because
offsets in state represent the last processed record
+ assertEquals(
+ partition.getOffset() + 1,
+
mockConsumerAssignmentsAndPositions.get(partition.getKafkaPartitionHandle()).longValue());
+ }
+
+ assertEquals(0, unassignedPartitionsQueue.size());
+ }
+
+ /**
+ * Tests reassignment works correctly in the case when:
+ * - the consumer already have some assignments
+ * - new unassigned partitions already have defined offsets
+ * - the consumer was woken up prior to the reassignment
+ *
+ * In this case, reassignment should not have occurred at all, and the
consumer retains the original assignment.
+ *
+ * Setting a timeout because the test will not finish if there is logic
error with
+ * the reassignment flow.
+ */
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 10000)
+ public void
testReassigningPartitionsWithDefinedOffsetsWhenEarlyWakeup() throws Exception {
+ final String testTopic = "test-topic";
+
+ // -------- old partitions --------
+
+ KafkaTopicPartitionState<TopicPartition> oldPartition1 = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 0), new
TopicPartition(testTopic, 0));
+ oldPartition1.setOffset(23L);
+
+ KafkaTopicPartitionState<TopicPartition> oldPartition2 = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 1), new
TopicPartition(testTopic, 1));
+ oldPartition2.setOffset(32L);
+
+ List<KafkaTopicPartitionState<TopicPartition>> oldPartitions =
new ArrayList<>(2);
+ oldPartitions.add(oldPartition1);
+ oldPartitions.add(oldPartition2);
+
+ // -------- new partitions with defined offsets --------
+
+ KafkaTopicPartitionState<TopicPartition> newPartition = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 2), new
TopicPartition(testTopic, 2));
+ newPartition.setOffset(29L);
+
+ // -------- setup mock KafkaConsumer --------
+
+ // initial assignments
+ final Map<TopicPartition, Long>
mockConsumerAssignmentsToPositions = new LinkedHashMap<>();
+ for (KafkaTopicPartitionState<TopicPartition> oldPartition :
oldPartitions) {
+
mockConsumerAssignmentsToPositions.put(oldPartition.getKafkaPartitionHandle(),
oldPartition.getOffset() + 1);
+ }
+
+ final KafkaConsumer<byte[], byte[]> mockConsumer =
createMockConsumer(
+ mockConsumerAssignmentsToPositions,
+ Collections.<TopicPartition, Long>emptyMap(),
+ true,
+ null,
+ null);
+
+ // -------- setup new partitions to be polled from the
unassigned partitions queue --------
+
+ final
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>>
unassignedPartitionsQueue =
+ new ClosableBlockingQueue<>();
+
+ unassignedPartitionsQueue.add(newPartition);
+
+ // -------- start test --------
+
+ final TestKafkaConsumerThread testThread =
+ new TestKafkaConsumerThread(mockConsumer,
unassignedPartitionsQueue, new Handover());
+ testThread.start();
+
+ // pause just before the reassignment so we can inject the
wakeup
+ testThread.waitPartitionReassignmentInvoked();
+
+ testThread.setOffsetsToCommit(new HashMap<TopicPartition,
OffsetAndMetadata>());
+ verify(mockConsumer, times(1)).wakeup();
+
+ testThread.startPartitionReassignment();
+ testThread.waitPartitionReassignmentComplete();
+
+ // the consumer's assignment should have remained untouched
+
+ assertEquals(oldPartitions.size(),
mockConsumerAssignmentsToPositions.size());
+
+ for (KafkaTopicPartitionState<TopicPartition> oldPartition :
oldPartitions) {
+
assertTrue(mockConsumerAssignmentsToPositions.containsKey(oldPartition.getKafkaPartitionHandle()));
+ assertEquals(
+ oldPartition.getOffset() + 1,
+
mockConsumerAssignmentsToPositions.get(oldPartition.getKafkaPartitionHandle()).longValue());
+ }
+
+ // the new partitions should have been re-added to the
unassigned partitions queue
+ assertEquals(1, unassignedPartitionsQueue.size());
+ }
+
+ /**
+ * Tests reassignment works correctly in the case when:
+ * - the consumer has no initial assignments
+ * - new unassigned partitions have undefined offsets
+ * - the consumer was woken up prior to the reassignment
+ *
+ * In this case, reassignment should not have occurred at all, and the
consumer retains the original assignment.
+ *
+ * Setting a timeout because the test will not finish if there is logic
error with
+ * the reassignment flow.
+ */
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 10000)
+ public void
testReassignPartitionsDefinedOffsetsWithoutInitialAssignmentsWhenEarlyWakeup()
throws Exception {
+ final String testTopic = "test-topic";
+
+ // -------- new partitions with defined offsets --------
+
+ KafkaTopicPartitionState<TopicPartition> newPartition1 = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 0), new
TopicPartition(testTopic, 0));
+
newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
+
+ KafkaTopicPartitionState<TopicPartition> newPartition2 = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 1), new
TopicPartition(testTopic, 1));
+
newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
+
+ List<KafkaTopicPartitionState<TopicPartition>> newPartitions =
new ArrayList<>(2);
+ newPartitions.add(newPartition1);
+ newPartitions.add(newPartition2);
+
+ // -------- setup mock KafkaConsumer --------
+
+ // no initial assignments
+ final Map<TopicPartition, Long>
mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
+
+ // mock retrieved values that should replace the
EARLIEST_OFFSET sentinels
+ final Map<TopicPartition, Long> mockRetrievedPositions = new
HashMap<>();
+
mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L);
+
mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L);
+
+ final KafkaConsumer<byte[], byte[]> mockConsumer =
createMockConsumer(
+ mockConsumerAssignmentsAndPositions,
+ mockRetrievedPositions,
+ true,
+ null,
+ null);
+
+ // -------- setup new partitions to be polled from the
unassigned partitions queue --------
+
+ final
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>>
unassignedPartitionsQueue =
+ new ClosableBlockingQueue<>();
+
+ for (KafkaTopicPartitionState<TopicPartition> newPartition :
newPartitions) {
+ unassignedPartitionsQueue.add(newPartition);
+ }
+
+ // -------- start test --------
+
+ final TestKafkaConsumerThread testThread =
+ new TestKafkaConsumerThread(mockConsumer,
unassignedPartitionsQueue, new Handover());
+ testThread.start();
+
+ // pause just before the reassignment so we can inject the
wakeup
+ testThread.waitPartitionReassignmentInvoked();
+
+ testThread.setOffsetsToCommit(new HashMap<TopicPartition,
OffsetAndMetadata>());
+
+ // make sure the consumer was actually woken up
+ verify(mockConsumer, times(1)).wakeup();
+
+ testThread.startPartitionReassignment();
+ testThread.waitPartitionReassignmentComplete();
+
+ // the consumer's assignment should have remained untouched (in
this case, empty)
+ assertEquals(0, mockConsumerAssignmentsAndPositions.size());
+
+ // the new partitions should have been re-added to the
unassigned partitions queue
+ assertEquals(2, unassignedPartitionsQueue.size());
+ }
+
+ /**
+ * Tests reassignment works correctly in the case when:
+ * - the consumer has no initial assignments
+ * - new unassigned partitions have undefined offsets
+ * - the consumer was woken up during the reassignment
+ *
+ * In this case, reassignment should have completed, and the consumer
is restored the wakeup call after the reassignment.
+ *
+ * Setting a timeout because the test will not finish if there is logic
error with
+ * the reassignment flow.
+ */
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 10000)
+ public void
testReassignPartitionsDefinedOffsetsWithoutInitialAssignmentsWhenWakeupMidway()
throws Exception {
+ final String testTopic = "test-topic";
+
+ // -------- new partitions with defined offsets --------
+
+ KafkaTopicPartitionState<TopicPartition> newPartition1 = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 0), new
TopicPartition(testTopic, 0));
+
newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
+
+ KafkaTopicPartitionState<TopicPartition> newPartition2 = new
KafkaTopicPartitionState<>(
+ new KafkaTopicPartition(testTopic, 1), new
TopicPartition(testTopic, 1));
+
newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
+
+ List<KafkaTopicPartitionState<TopicPartition>> newPartitions =
new ArrayList<>(2);
+ newPartitions.add(newPartition1);
+ newPartitions.add(newPartition2);
+
+ // -------- setup mock KafkaConsumer --------
+
+ // no initial assignments
+ final Map<TopicPartition, Long>
mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
+
+ // mock retrieved values that should replace the
EARLIEST_OFFSET sentinels
+ final Map<TopicPartition, Long> mockRetrievedPositions = new
HashMap<>();
+
mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L);
+
mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L);
+
+ // these latches are used to pause midway the reassignment
process
+ final OneShotLatch midAssignmentLatch = new OneShotLatch();
+ final OneShotLatch continueAssigmentLatch = new OneShotLatch();
+
+ final KafkaConsumer<byte[], byte[]> mockConsumer =
createMockConsumer(
+ mockConsumerAssignmentsAndPositions,
+ mockRetrievedPositions,
+ false,
+ midAssignmentLatch,
+ continueAssigmentLatch);
+
+ // -------- setup new partitions to be polled from the
unassigned partitions queue --------
+
+ final
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>>
unassignedPartitionsQueue =
+ new ClosableBlockingQueue<>();
+
+ for (KafkaTopicPartitionState<TopicPartition> newPartition :
newPartitions) {
+ unassignedPartitionsQueue.add(newPartition);
+ }
+
+ // -------- start test --------
+
+ final TestKafkaConsumerThread testThread =
+ new TestKafkaConsumerThread(mockConsumer,
unassignedPartitionsQueue, new Handover());
+ testThread.start();
+
+ testThread.startPartitionReassignment();
+
+ // wait until the reassignment has started
+ midAssignmentLatch.await();
+
+ testThread.setOffsetsToCommit(new HashMap<TopicPartition,
OffsetAndMetadata>());
+
+ // the wakeup the setOffsetsToCommit() call should have been
buffered, and not called on the consumer
--- End diff --
should be "the wake in the setOffset..."?
> Partition discovery / regex topic subscription for the Kafka consumer
> ---------------------------------------------------------------------
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
> Issue Type: New Feature
> Components: Kafka Connector, Streaming Connectors
> Affects Versions: 1.0.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job
> submission, the main big change required for this feature will be dynamic
> partition assignment to subtasks while the Kafka consumer is running. This
> will mainly be accomplished using Kafka 0.9.x API
> `KafkaConsumer#subscribe(java.util.regex.Pattern,
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be
> added to the same consumer group when instantiated, and rely on Kafka to
> dynamically reassign partitions to them whenever a rebalance happens. The
> registered `ConsumerRebalanceListener` is a callback that is called right
> before and after rebalancing happens. We'll use this callback to let each
> subtask commit its last offsets of partitions its currently responsible of to
> an external store (or Kafka) before a rebalance; after rebalance and the
> substasks gets the new partitions it'll be reading from, they'll read from
> the external store to get the last offsets for their new partitions
> (partitions which don't have offset entries in the store are new partitions
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot
> the offsets of partitions they are currently holding. Restoring will be a
> bit different in that subtasks might not be assigned matching partitions to
> the snapshot the subtask is restored with (since we're letting Kafka
> dynamically assign partitions). There will need to be a coordination process
> where, if a restore state exists, all subtasks first commit the offsets they
> receive (as a result of the restore state) to the external store, and then
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is
> available, then the restore will be simple again, as each subtask has full
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign
> static partitions, use subscribe() registered with the callback
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics),
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed
> list of topics. We can simply decide which subscribe() overload to use
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance.
> Instead, un-assigned subtasks should be running a fetcher instance too and
> take part as a process pool for the consumer group of the subscribed topics.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)