This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 9dce3e7 KAFKA-8972 (2.4 blocker): correctly release lost partitions
during consumer.unsubscribe() (#7441)
9dce3e7 is described below
commit 9dce3e75353f10dd9770e776c227f47bbe993230
Author: Boyang Chen <[email protected]>
AuthorDate: Tue Oct 29 10:41:25 2019 -0700
KAFKA-8972 (2.4 blocker): correctly release lost partitions during
consumer.unsubscribe() (#7441)
Inside onLeavePrepare we would look into the assignment and try to revoke
the owned tasks and notify users via RebalanceListener#onPartitionsRevoked, and
then clear the assignment.
However, the subscription's assignment is already cleared in
this.subscriptions.unsubscribe(); which means user's rebalance listener would
never be triggered. In other words, from consumer client's pov nothing is owned
after unsubscribe, but from the user caller's pov the partitions are not
revoked yet. For callers like Kafka Streams which rely on the rebalance
listener to maintain their internal state, this leads to inconsistent state
management and failure cases.
Before KIP-429 this issue is hidden away since every time the consumer
re-joins the group later, it would still revoke everything anyways regardless
of the passed-in parameters of the rebalance listener; with KIP-429 this is
easier to reproduce now.
Our fixes are following:
• Inside unsubscribe, first do onLeavePrepare / maybeLeaveGroup and then
subscription.unsubscribe. This we we are guaranteed that the streams' tasks are
all closed as revoked by then.
• [Optimization] If the generation is reset due to fatal error from join /
hb response etc, then we know that all partitions are lost, and we should not
trigger onPartitionRevoked, but instead just onPartitionsLost inside
onLeavePrepare. This is because we don't want to commit for lost tracks during
rebalance which is doomed to fail as we don't have any generation info.
Reviewers: Matthias J. Sax <[email protected]>, A. Sophie Blee-Goldman
<[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang
<[email protected]>
---
.../kafka/clients/consumer/KafkaConsumer.java | 2 +-
.../consumer/internals/ConsumerCoordinator.java | 7 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 100 +++++++++++++++++----
.../internals/ConsumerCoordinatorTest.java | 30 ++++++-
.../internals/StreamsRebalanceListener.java | 6 +-
.../streams/processor/internals/TaskManager.java | 14 +++
tests/kafkatest/services/streams.py | 14 ++-
.../tests/streams/streams_broker_bounce_test.py | 13 +--
8 files changed, 157 insertions(+), 29 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f12beaf..5a5cad8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1069,11 +1069,11 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
acquireAndEnsureOpen();
try {
fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
- this.subscriptions.unsubscribe();
if (this.coordinator != null) {
this.coordinator.onLeavePrepare();
this.coordinator.maybeLeaveGroup("the consumer unsubscribed
from all topics");
}
+ this.subscriptions.unsubscribe();
log.info("Unsubscribed all topics or patterns and assigned
partitions");
} finally {
release();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index bceb9b8..d5b3061 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -696,7 +696,12 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
Set<TopicPartition> droppedPartitions = new
HashSet<>(subscriptions.assignedPartitions());
if (subscriptions.partitionsAutoAssigned() &&
!droppedPartitions.isEmpty()) {
- final Exception e = invokePartitionsRevoked(droppedPartitions);
+ final Exception e;
+ if (generation() != Generation.NO_GENERATION) {
+ e = invokePartitionsRevoked(droppedPartitions);
+ } else {
+ e = invokePartitionsLost(droppedPartitions);
+ }
subscriptions.assignFromSubscribed(Collections.emptySet());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 68fac2b..e939d96 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -150,6 +150,12 @@ public class KafkaConsumerTest {
private final String groupId = "mock-group";
private final Optional<String> groupInstanceId =
Optional.of("mock-instance");
+ private final String partitionRevoked = "Hit partition revoke ";
+ private final String partitionAssigned = "Hit partition assign ";
+ private final String partitionLost = "Hit partition lost ";
+
+ private final Collection<TopicPartition> singleTopicPartition =
Collections.singleton(new TopicPartition(topic, 0));
+
@Test
public void testMetricsReporterAutoGeneratedClientId() {
Properties props = new Properties();
@@ -409,7 +415,7 @@ public class KafkaConsumerTest {
assertEquals(singleton(tp0), consumer.assignment());
- AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client,
coordinator);
+ AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client,
coordinator, Errors.NONE);
// heartbeat interval is 2 seconds
time.sleep(heartbeatIntervalMs);
@@ -444,7 +450,7 @@ public class KafkaConsumerTest {
client.poll(0, time.milliseconds());
client.prepareResponseFrom(fetchResponse(tp0, 5, 0), node);
- AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client,
coordinator);
+ AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client,
coordinator, Errors.NONE);
time.sleep(heartbeatIntervalMs);
Thread.sleep(heartbeatIntervalMs);
@@ -661,7 +667,6 @@ public class KafkaConsumerTest {
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
- Node node = metadata.fetch().nodes().get(0);
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
@@ -1066,12 +1071,7 @@ public class KafkaConsumerTest {
KafkaConsumer<String, String> consumer = newConsumer(time, client,
subscription, metadata, assignor, false, groupInstanceId);
- // initial subscription
- consumer.subscribe(singleton(topic),
getConsumerRebalanceListener(consumer));
-
- // verify that subscription has changed but assignment is still
unchanged
- assertEquals(singleton(topic), consumer.subscription());
- assertEquals(Collections.emptySet(), consumer.assignment());
+ initializeSubscriptionWithSingleTopic(consumer,
getConsumerRebalanceListener(consumer));
// mock rebalance responses
prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1112,6 +1112,71 @@ public class KafkaConsumerTest {
}
@Test
+ public void
testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration() {
+ Time time = new MockTime();
+ SubscriptionState subscription = new SubscriptionState(new
LogContext(), OffsetResetStrategy.EARLIEST);
+ ConsumerMetadata metadata = createMetadata(subscription);
+ MockClient client = new MockClient(time, metadata);
+
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
+
+ CooperativeStickyAssignor assignor = new CooperativeStickyAssignor();
+ KafkaConsumer<String, String> consumer = newConsumer(time, client,
subscription, metadata, assignor, false, groupInstanceId);
+
+ initializeSubscriptionWithSingleTopic(consumer,
getExceptionConsumerRebalanceListener());
+
+ prepareRebalance(client, node, assignor, singletonList(tp0), null);
+
+ RuntimeException assignmentException =
assertThrows(RuntimeException.class,
+ () ->
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)));
+ assertEquals(partitionAssigned + singleTopicPartition,
assignmentException.getCause().getMessage());
+
+ RuntimeException unsubscribeException =
assertThrows(RuntimeException.class, consumer::unsubscribe);
+ assertEquals(partitionRevoked + singleTopicPartition,
unsubscribeException.getCause().getMessage());
+ }
+
+ @Test
+ public void testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration()
throws Exception {
+ Time time = new MockTime();
+ SubscriptionState subscription = new SubscriptionState(new
LogContext(), OffsetResetStrategy.EARLIEST);
+ ConsumerMetadata metadata = createMetadata(subscription);
+ MockClient client = new MockClient(time, metadata);
+
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
+
+ CooperativeStickyAssignor assignor = new CooperativeStickyAssignor();
+ KafkaConsumer<String, String> consumer = newConsumer(time, client,
subscription, metadata, assignor, false, groupInstanceId);
+
+ initializeSubscriptionWithSingleTopic(consumer,
getExceptionConsumerRebalanceListener());
+ Node coordinator = prepareRebalance(client, node, assignor,
singletonList(tp0), null);
+
+ RuntimeException assignException = assertThrows(RuntimeException.class,
+ () ->
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)));
+ assertEquals(partitionAssigned + singleTopicPartition,
assignException.getCause().getMessage());
+
+ AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client,
coordinator, Errors.UNKNOWN_MEMBER_ID);
+
+ time.sleep(heartbeatIntervalMs);
+ TestUtils.waitForCondition(heartbeatReceived::get, "Heartbeat response
did not occur within timeout.");
+
+ consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
+ assertTrue(heartbeatReceived.get());
+
+ RuntimeException unsubscribeException =
assertThrows(RuntimeException.class, consumer::unsubscribe);
+ assertEquals(partitionLost + singleTopicPartition,
unsubscribeException.getCause().getMessage());
+ }
+
+ private void initializeSubscriptionWithSingleTopic(KafkaConsumer<String,
String> consumer,
+
ConsumerRebalanceListener consumerRebalanceListener) {
+ consumer.subscribe(singleton(topic), consumerRebalanceListener);
+ // verify that subscription has changed but assignment is still
unchanged
+ assertEquals(singleton(topic), consumer.subscription());
+ assertEquals(Collections.emptySet(), consumer.assignment());
+ }
+
+ @Test
public void testManualAssignmentChangeWithAutoCommitEnabled() {
Time time = new MockTime();
SubscriptionState subscription = new SubscriptionState(new
LogContext(), OffsetResetStrategy.EARLIEST);
@@ -1666,7 +1731,7 @@ public class KafkaConsumerTest {
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
fail("Should throw exception");
} catch (Throwable e) {
- assertEquals("boom!", e.getCause().getMessage());
+ assertEquals(partitionAssigned + singleTopicPartition,
e.getCause().getMessage());
}
// the assignment is still updated regardless of the exception
@@ -1677,7 +1742,7 @@ public class KafkaConsumerTest {
consumer.close(Duration.ofMillis(0));
fail("Should throw exception");
} catch (Throwable e) {
- assertEquals("boom!", e.getCause().getCause().getMessage());
+ assertEquals(partitionRevoked + singleTopicPartition,
e.getCause().getCause().getMessage());
}
consumer.close(Duration.ofMillis(0));
@@ -1720,12 +1785,17 @@ public class KafkaConsumerTest {
return new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
- throw new RuntimeException("boom!");
+ throw new RuntimeException(partitionRevoked + partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
- throw new RuntimeException("boom!");
+ throw new RuntimeException(partitionAssigned + partitions);
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition>
partitions) {
+ throw new RuntimeException(partitionLost + partitions);
}
};
}
@@ -1779,7 +1849,7 @@ public class KafkaConsumerTest {
return coordinator;
}
- private AtomicBoolean prepareHeartbeatResponse(MockClient client, Node
coordinator) {
+ private AtomicBoolean prepareHeartbeatResponse(MockClient client, Node
coordinator, Errors error) {
final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
client.prepareResponseFrom(new MockClient.RequestMatcher() {
@Override
@@ -1787,7 +1857,7 @@ public class KafkaConsumerTest {
heartbeatReceived.set(true);
return true;
}
- }, new HeartbeatResponse(new
HeartbeatResponseData().setErrorCode(Errors.NONE.code())), coordinator);
+ }, new HeartbeatResponse(new
HeartbeatResponseData().setErrorCode(error.code())), coordinator);
return heartbeatReceived;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index fd3411b..5ff9761 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -454,6 +454,34 @@ public class ConsumerCoordinatorTest {
}
@Test
+ public void testUnsubscribeWithValidGeneration() {
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+ subscriptions.subscribe(singleton(topic1), rebalanceListener);
+ ByteBuffer buffer = ConsumerProtocol.serializeAssignment(
+ new
ConsumerPartitionAssignor.Assignment(Collections.singletonList(t1p),
ByteBuffer.wrap(new byte[0])));
+ coordinator.onJoinComplete(1, "memberId", partitionAssignor.name(),
buffer);
+
+ coordinator.onLeavePrepare();
+ assertEquals(1, rebalanceListener.lostCount);
+ assertEquals(0, rebalanceListener.revokedCount);
+ }
+
+ @Test
+ public void testUnsubscribeWithInvalidGeneration() {
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+ subscriptions.subscribe(singleton(topic1), rebalanceListener);
+ subscriptions.assignFromSubscribed(Collections.singletonList(t1p));
+
+ coordinator.onLeavePrepare();
+ assertEquals(1, rebalanceListener.lostCount);
+ assertEquals(0, rebalanceListener.revokedCount);
+ }
+
+ @Test
public void testUnknownMemberId() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -2300,7 +2328,7 @@ public class ConsumerCoordinatorTest {
MockTime time = new MockTime(1);
- //onJoinPrepare will be executed and onJoinComplete will not.
+ // onJoinPrepare will be executed and onJoinComplete will not.
boolean res = coordinator.joinGroupIfNeeded(time.timer(2));
assertFalse(res);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index 3adac44..f2c75b2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -36,9 +36,9 @@ public class StreamsRebalanceListener implements
ConsumerRebalanceListener {
private final Logger log;
StreamsRebalanceListener(final Time time,
- final TaskManager taskManager,
- final StreamThread streamThread,
- final Logger log) {
+ final TaskManager taskManager,
+ final StreamThread streamThread,
+ final Logger log) {
this.time = time;
this.taskManager = taskManager;
this.streamThread = streamThread;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index a9ccbf5..82496df 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -466,6 +466,20 @@ public class TaskManager {
}
}
+ log.debug("Assigning metadata with: " +
+ "\tactiveTasks: {},\n" +
+ "\tstandbyTasks: {}\n" +
+ "The updated active task states are: \n" +
+ "\tassignedActiveTasks {},\n" +
+ "\tassignedStandbyTasks {},\n" +
+ "\taddedActiveTasks {},\n" +
+ "\taddedStandbyTasks {},\n" +
+ "\trevokedActiveTasks {},\n" +
+ "\trevokedStandbyTasks {}",
+ activeTasks, standbyTasks,
+ assignedActiveTasks, assignedStandbyTasks,
+ addedActiveTasks, addedStandbyTasks,
+ revokedActiveTasks, revokedStandbyTasks);
this.assignedActiveTasks = activeTasks;
this.assignedStandbyTasks = standbyTasks;
}
diff --git a/tests/kafkatest/services/streams.py
b/tests/kafkatest/services/streams.py
index 52afe4e..06686cf 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -303,12 +303,20 @@ class StreamsTestBaseService(KafkaPathResolverMixin,
JmxMixin, Service):
class StreamsSmokeTestBaseService(StreamsTestBaseService):
"""Base class for Streams Smoke Test services providing some common
settings and functionality"""
- def __init__(self, test_context, kafka, command):
+ def __init__(self, test_context, kafka, command, num_threads = 3):
super(StreamsSmokeTestBaseService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsSmokeTest",
command)
+ self.NUM_THREADS = num_threads
+ def prop_file(self):
+ properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+ streams_property.KAFKA_SERVERS:
self.kafka.bootstrap_servers(),
+ streams_property.NUM_THREADS: self.NUM_THREADS}
+
+ cfg = KafkaConfig(**properties)
+ return cfg.render()
class StreamsEosTestBaseService(StreamsTestBaseService):
"""Base class for Streams EOS Test services providing some common settings
and functionality"""
@@ -352,8 +360,8 @@ class
StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
return cmd
class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
- def __init__(self, test_context, kafka):
- super(StreamsSmokeTestJobRunnerService, self).__init__(test_context,
kafka, "process")
+ def __init__(self, test_context, kafka, num_threads = 3):
+ super(StreamsSmokeTestJobRunnerService, self).__init__(test_context,
kafka, "process", num_threads)
class StreamsSmokeTestEOSJobRunnerService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index 3d6572d..974d450 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -150,7 +150,7 @@ class StreamsBrokerBounceTest(Test):
return True
- def setup_system(self, start_processor=True):
+ def setup_system(self, start_processor=True, num_threads=3):
# Setup phase
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
@@ -165,7 +165,7 @@ class StreamsBrokerBounceTest(Test):
# Start test harness
self.driver = StreamsSmokeTestDriverService(self.test_context,
self.kafka)
- self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context,
self.kafka)
+ self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context,
self.kafka, num_threads)
self.driver.start()
@@ -208,13 +208,16 @@ class StreamsBrokerBounceTest(Test):
@cluster(num_nodes=7)
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce",
"hard_bounce"],
broker_type=["leader", "controller"],
+ num_threads=[1, 3],
sleep_time_secs=[120])
- def test_broker_type_bounce(self, failure_mode, broker_type,
sleep_time_secs):
+ def test_broker_type_bounce(self, failure_mode, broker_type,
sleep_time_secs, num_threads):
"""
Start a smoke test client, then kill one particular broker and ensure
data is still received
- Record if records are delivered.
+ Record if records are delivered.
+ We also add a single thread stream client to make sure we could get
all partitions reassigned in
+ next generation so to verify the partition lost is correctly triggered.
"""
- self.setup_system()
+ self.setup_system(num_threads=num_threads)
# Sleep to allow test to run for a bit
time.sleep(sleep_time_secs)