This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fe0b704  KAFKA-9645: Fallback to unsubscribe during Task Migrated 
(#8220)
fe0b704 is described below

commit fe0b704285ebc916ce5080a5248d91b4dc3c60e0
Author: Boyang Chen <[email protected]>
AuthorDate: Sat Mar 7 08:08:23 2020 -0800

    KAFKA-9645: Fallback to unsubscribe during Task Migrated (#8220)
    
    After #7312, we could still return data during the rebalance phase, which 
means it could be possible to find records without corresponding tasks. We have 
to fallback to the unsubscribe mode during task migrated as the assignment 
should be cleared out to keep sync with task manager state.
    
    Reviewers: A. Sophie Blee-Goldman <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../kafka/clients/consumer/MockConsumer.java       | 24 ++++-----
 .../streams/processor/internals/StreamThread.java  |  5 +-
 .../processor/internals/StreamThreadTest.java      | 61 ++++++++++++++++++++++
 3 files changed, 76 insertions(+), 14 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index b8579c4..33702c3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -66,6 +66,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     private KafkaException offsetsException;
     private AtomicBoolean wakeup;
     private boolean closed;
+    private boolean shouldRebalance;
 
     public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
         this.subscriptions = new SubscriptionState(new LogContext(), 
offsetResetStrategy);
@@ -79,6 +80,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         this.pollException = null;
         this.wakeup = new AtomicBoolean(false);
         this.committed = new HashMap<>();
+        this.shouldRebalance = false;
     }
 
     @Override
@@ -356,21 +358,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> 
{
         subscriptions.requestOffsetReset(partitions, 
OffsetResetStrategy.LATEST);
     }
 
-    // needed for cases where you make a second call to endOffsets
-    public synchronized void addEndOffsets(final Map<TopicPartition, Long> 
newOffsets) {
-        innerUpdateEndOffsets(newOffsets, false);
-    }
-
     public synchronized void updateEndOffsets(final Map<TopicPartition, Long> 
newOffsets) {
-        innerUpdateEndOffsets(newOffsets, true);
-    }
-
-    private void innerUpdateEndOffsets(final Map<TopicPartition, Long> 
newOffsets,
-                                       final boolean replace) {
-
         for (final Map.Entry<TopicPartition, Long> entry : 
newOffsets.entrySet()) {
             List<Long> offsets = endOffsets.get(entry.getKey());
-            if (replace || offsets == null) {
+            if (offsets == null) {
                 offsets = new ArrayList<>();
             }
             offsets.add(entry.getValue());
@@ -568,6 +559,15 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
 
     @Override
     public void enforceRebalance() {
+        shouldRebalance = true;
+    }
+
+    public boolean shouldRebalance() {
+        return shouldRebalance;
+    }
+
+    public void resetShouldRebalance() {
+        shouldRebalance = false;
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d67fff8..7b6ac83 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -508,11 +508,12 @@ public class StreamThread extends Thread {
                     "Will close out all assigned tasks and rejoin the consumer 
group.");
 
                 taskManager.handleLostAll();
-                mainConsumer.enforceRebalance();
+                mainConsumer.unsubscribe();
+                subscribeConsumer();
             }
         }
     }
-    
+
     private void subscribeConsumer() {
         if (builder.usesPatternSubscription()) {
             mainConsumer.subscribe(builder.sourceTopicPattern(), 
rebalanceListener);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 13a669d..1bb4c9c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Cluster;
@@ -808,6 +809,66 @@ public class StreamThreadTest {
     }
 
     @Test
+    public void shouldNotReturnDataAfterTaskMigrated() {
+        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
+
+        internalTopologyBuilder = 
EasyMock.createNiceMock(InternalTopologyBuilder.class);
+
+        
EasyMock.expect(internalTopologyBuilder.sourceTopicCollection()).andReturn(Collections.singletonList(topic1)).times(2);
+
+        final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        consumer.subscribe(Collections.singletonList(topic1), new 
MockRebalanceListener());
+        consumer.rebalance(Collections.singletonList(t1p1));
+        consumer.updateEndOffsets(Collections.singletonMap(t1p1, 10L));
+        consumer.seekToEnd(Collections.singletonList(t1p1));
+
+        final ChangelogReader changelogReader = new MockChangelogReader() {
+
+            @Override
+            public void restore() {
+                consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new 
byte[0], new byte[0]));
+                consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new 
byte[1], new byte[0]));
+
+                throw new TaskMigratedException(
+                    "Changelog restore found task migrated", new 
RuntimeException("restore task migrated"));
+            }
+        };
+
+        taskManager.handleLostAll();
+
+        EasyMock.replay(taskManager, internalTopologyBuilder);
+
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST);
+
+        final StreamThread thread = new StreamThread(
+            mockTime,
+            config,
+            null,
+            consumer,
+            consumer,
+            changelogReader,
+            null,
+            taskManager,
+            streamsMetrics,
+            internalTopologyBuilder,
+            CLIENT_ID,
+            new LogContext(""),
+            new AtomicInteger()
+        ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+
+        final IllegalStateException thrown = assertThrows(
+            IllegalStateException.class, thread::run);
+
+        EasyMock.verify(taskManager);
+
+        // The Mock consumer shall throw as the assignment has been wiped out, 
but records are assigned.
+        assertEquals("No current assignment for partition topic1-1", 
thrown.getMessage());
+        assertFalse(consumer.shouldRebalance());
+    }
+
+    @Test
     public void shouldShutdownTaskManagerOnCloseWithoutStart() {
         final Consumer<byte[], byte[]> consumer = 
EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);

Reply via email to