This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1e45927 KAFKA-12648: fix IllegalStateException in ClientState after
removing topologies (#11591)
1e45927 is described below
commit 1e459271d777e4721c3f7a36c5b4fbb2a5793f63
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Fri Dec 10 14:26:27 2021 -0800
KAFKA-12648: fix IllegalStateException in ClientState after removing
topologies (#11591)
Fix for one of the causes of failure in the NamedTopologyIntegrationTest:
org.apache.kafka.streams.errors.StreamsException:
java.lang.IllegalStateException: Must initialize prevActiveTasks from
ownedPartitions before initializing remaining tasks.
This exception could occur if a member sent in a subscription where all of
its ownedPartitions were from a named topology that is no longer recognized by
the group leader, eg because it was just removed from the client. We should
filter each ClientState based on the current topology only so the assignor only
processes the partitions/tasks it can identify. The member with the out-of-date
tasks will eventually clean them up when the #removeNamedTopology API is
invoked on them
Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang
<[email protected]>, Walker Carlson <[email protected]>
---
.../internals/StreamsPartitionAssignor.java | 2 +-
.../internals/assignment/ClientState.java | 17 ++++++++++-
.../KafkaStreamsNamedTopologyWrapper.java | 4 +--
.../internals/StreamsPartitionAssignorTest.java | 8 ++---
.../internals/assignment/ClientStateTest.java | 34 ++++++++++++++++++----
5 files changed, 51 insertions(+), 14 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 2ae381d..083253c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -680,7 +680,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
for (final Map.Entry<UUID, ClientMetadata> entry :
clientMetadataMap.entrySet()) {
final UUID uuid = entry.getKey();
final ClientState state = entry.getValue().state;
- state.initializePrevTasks(taskForPartition);
+ state.initializePrevTasks(taskForPartition,
taskManager.topologyMetadata().hasNamedTopologies());
state.computeTaskLags(uuid, allTaskEndOffsetSums);
clientStates.put(uuid, state);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index d828f1e..46f107e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -277,14 +277,29 @@ public class ClientState {
consumerToPreviousStatefulTaskIds.put(consumerId,
taskOffsetSums.keySet());
}
- public void initializePrevTasks(final Map<TopicPartition, TaskId>
taskForPartitionMap) {
+ public void initializePrevTasks(final Map<TopicPartition, TaskId>
taskForPartitionMap,
+ final boolean hasNamedTopologies) {
if (!previousActiveTasks.taskIds().isEmpty() ||
!previousStandbyTasks.taskIds().isEmpty()) {
throw new IllegalStateException("Already added previous tasks to
this client state.");
}
+
+ maybeFilterUnknownPrevTasksAndPartitions(taskForPartitionMap,
hasNamedTopologies);
initializePrevActiveTasksFromOwnedPartitions(taskForPartitionMap);
initializeRemainingPrevTasksFromTaskOffsetSums();
}
+ private void maybeFilterUnknownPrevTasksAndPartitions(final
Map<TopicPartition, TaskId> taskForPartitionMap,
+ final boolean
hasNamedTopologies) {
+ // If this application uses named topologies, then it's possible for
members to report tasks
+ // or partitions in their subscription that belong to a named topology
that the group leader
+ // doesn't currently recognize, eg because it was just removed
+ if (hasNamedTopologies) {
+ ownedPartitions.keySet().retainAll(taskForPartitionMap.keySet());
+
previousActiveTasks.taskIds().retainAll(taskForPartitionMap.values());
+
previousStandbyTasks.taskIds().retainAll(taskForPartitionMap.values());
+ }
+ }
+
/**
* Compute the lag for each stateful task, including tasks this client did
not previously have.
*/
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
index e7ed7ef..0dd7eca 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
@@ -153,7 +153,7 @@ public class KafkaStreamsNamedTopologyWrapper extends
KafkaStreams {
* @throws TopologyException if this topology subscribes to any
input topics or pattern already in use
*/
public AddNamedTopologyResult addNamedTopology(final NamedTopology
newTopology) {
- log.debug("Adding topology: {}", newTopology.name());
+ log.info("Adding topology: {}", newTopology.name());
if (hasStartedOrFinishedShuttingDown()) {
throw new IllegalStateException("Cannot add a NamedTopology while
the state is " + super.state);
} else if (getTopologyByName(newTopology.name()).isPresent()) {
@@ -179,7 +179,7 @@ public class KafkaStreamsNamedTopologyWrapper extends
KafkaStreams {
* @throws TopologyException if this topology subscribes to any
input topics or pattern already in use
*/
public RemoveNamedTopologyResult removeNamedTopology(final String
topologyToRemove, final boolean resetOffsets) {
- log.debug("Removing topology: {}", topologyToRemove);
+ log.info("Removing topology: {}", topologyToRemove);
if (!isRunningOrRebalancing()) {
throw new IllegalStateException("Cannot remove a NamedTopology
while the state is " + super.state);
} else if (!getTopologyByName(topologyToRemove).isPresent()) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 24bf5b8..971b688 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -312,7 +312,7 @@ public class StreamsPartitionAssignorTest {
state.addPreviousTasksAndOffsetSums(CONSUMER_1,
getTaskOffsetSums(asList(TASK_0_0, TASK_1_1, TASK_1_3), EMPTY_TASKS));
state.addPreviousTasksAndOffsetSums(CONSUMER_2,
getTaskOffsetSums(asList(TASK_0_3, TASK_1_0), EMPTY_TASKS));
state.addPreviousTasksAndOffsetSums(CONSUMER_3,
getTaskOffsetSums(asList(TASK_0_1, TASK_0_2, TASK_1_2), EMPTY_TASKS));
- state.initializePrevTasks(emptyMap());
+ state.initializePrevTasks(emptyMap(), false);
state.computeTaskLags(UUID_1, getTaskEndOffsetSums(allTasks));
assertEquivalentAssignment(
@@ -342,7 +342,7 @@ public class StreamsPartitionAssignorTest {
state.addPreviousTasksAndOffsetSums(CONSUMER_1,
getTaskOffsetSums(asList(TASK_0_0, TASK_1_1, TASK_1_3), EMPTY_TASKS));
state.addPreviousTasksAndOffsetSums(CONSUMER_2,
getTaskOffsetSums(asList(TASK_0_3, TASK_1_0), EMPTY_TASKS));
state.addPreviousTasksAndOffsetSums(CONSUMER_3,
getTaskOffsetSums(asList(TASK_0_1, TASK_0_2, TASK_1_2), EMPTY_TASKS));
- state.initializePrevTasks(emptyMap());
+ state.initializePrevTasks(emptyMap(), false);
state.computeTaskLags(UUID_1, getTaskEndOffsetSums(allTasks));
// We should be able to add a new task without sacrificing stickiness
@@ -378,7 +378,7 @@ public class StreamsPartitionAssignorTest {
state.addPreviousTasksAndOffsetSums(CONSUMER_1,
getTaskOffsetSums(asList(TASK_0_0, TASK_1_1, TASK_1_3), EMPTY_TASKS));
state.addPreviousTasksAndOffsetSums(CONSUMER_2,
getTaskOffsetSums(asList(TASK_0_3, TASK_1_0), EMPTY_TASKS));
state.addPreviousTasksAndOffsetSums(CONSUMER_3,
getTaskOffsetSums(asList(TASK_0_1, TASK_0_2, TASK_1_2), EMPTY_TASKS));
- state.initializePrevTasks(emptyMap());
+ state.initializePrevTasks(emptyMap(), false);
state.computeTaskLags(UUID_1, getTaskEndOffsetSums(allTasks));
// Consumer 3 leaves the group
@@ -421,7 +421,7 @@ public class StreamsPartitionAssignorTest {
consumers.add(CONSUMER_4);
state.addPreviousTasksAndOffsetSums(CONSUMER_4,
getTaskOffsetSums(EMPTY_TASKS, EMPTY_TASKS));
- state.initializePrevTasks(emptyMap());
+ state.initializePrevTasks(emptyMap(), false);
state.computeTaskLags(UUID_1, getTaskEndOffsetSums(allTasks));
final Map<String, List<TaskId>> assignment = assignTasksToThreads(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
index e8acdc3..928129c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
@@ -312,13 +313,32 @@ public class ClientStateTest {
public void shouldAddTasksWithLatestOffsetToPrevActiveTasks() {
final Map<TaskId, Long> taskOffsetSums =
Collections.singletonMap(TASK_0_1, Task.LATEST_OFFSET);
client.addPreviousTasksAndOffsetSums("c1", taskOffsetSums);
- client.initializePrevTasks(Collections.emptyMap());
+ client.initializePrevTasks(Collections.emptyMap(), false);
assertThat(client.prevActiveTasks(),
equalTo(Collections.singleton(TASK_0_1)));
assertThat(client.previousAssignedTasks(),
equalTo(Collections.singleton(TASK_0_1)));
assertTrue(client.prevStandbyTasks().isEmpty());
}
@Test
+ public void
shouldThrowWhenSomeOwnedPartitionsAreNotRecognizedWhenInitializingPrevTasks() {
+ final Map<TopicPartition, TaskId> taskForPartitionMap =
Collections.singletonMap(TP_0_1, TASK_0_1);
+ client.addOwnedPartitions(Collections.singleton(TP_0_0), "c1");
+ client.addPreviousTasksAndOffsetSums("c1", Collections.emptyMap());
+ assertThrows(IllegalStateException.class, () ->
client.initializePrevTasks(taskForPartitionMap, false));
+ }
+
+ @Test
+ public void
shouldFilterOutUnrecognizedPartitionsAndInitializePrevTasksWhenUsingNamedTopologies()
{
+ final Map<TopicPartition, TaskId> taskForPartitionMap =
Collections.singletonMap(TP_0_1, TASK_0_1);
+ client.addOwnedPartitions(Collections.singleton(TP_0_0), "c1");
+ client.addPreviousTasksAndOffsetSums("c1", Collections.emptyMap());
+ client.initializePrevTasks(taskForPartitionMap, true);
+ assertThat(client.prevActiveTasks().isEmpty(), is(true));
+ assertThat(client.previousAssignedTasks().isEmpty(), is(true));
+ assertThat(client.prevStandbyTasks().isEmpty(), is(true));
+ }
+
+ @Test
public void shouldReturnPreviousStatefulTasksForConsumer() {
client.addPreviousTasksAndOffsetSums("c1", mkMap(
mkEntry(TASK_0_0, 100L),
@@ -327,7 +347,7 @@ public class ClientStateTest {
client.addPreviousTasksAndOffsetSums("c2",
Collections.singletonMap(TASK_0_2, 0L));
client.addPreviousTasksAndOffsetSums("c3", Collections.emptyMap());
- client.initializePrevTasks(Collections.emptyMap());
+ client.initializePrevTasks(Collections.emptyMap(), false);
assertThat(client.prevOwnedStatefulTasksByConsumer("c1"),
equalTo(mkSet(TASK_0_0, TASK_0_1)));
assertThat(client.prevOwnedStatefulTasksByConsumer("c2"),
equalTo(mkSet(TASK_0_2)));
@@ -338,13 +358,15 @@ public class ClientStateTest {
public void shouldReturnPreviousActiveStandbyTasksForConsumer() {
client.addOwnedPartitions(mkSet(TP_0_1, TP_1_1), "c1");
client.addOwnedPartitions(mkSet(TP_0_2, TP_1_2), "c2");
- client.initializePrevTasks(mkMap(
+ client.initializePrevTasks(
+ mkMap(
mkEntry(TP_0_0, TASK_0_0),
mkEntry(TP_0_1, TASK_0_1),
mkEntry(TP_0_2, TASK_0_2),
mkEntry(TP_1_0, TASK_0_0),
mkEntry(TP_1_1, TASK_0_1),
- mkEntry(TP_1_2, TASK_0_2))
+ mkEntry(TP_1_2, TASK_0_2)),
+ false
);
client.addPreviousTasksAndOffsetSums("c1", mkMap(
@@ -406,7 +428,7 @@ public class ClientStateTest {
mkEntry(TASK_0_2, 100L)
);
client.addPreviousTasksAndOffsetSums("c1", taskOffsetSums);
- client.initializePrevTasks(Collections.emptyMap());
+ client.initializePrevTasks(Collections.emptyMap(), false);
assertThat(client.prevStandbyTasks(), equalTo(mkSet(TASK_0_1,
TASK_0_2)));
assertThat(client.previousAssignedTasks(), equalTo(mkSet(TASK_0_1,
TASK_0_2)));
assertTrue(client.prevActiveTasks().isEmpty());
@@ -501,7 +523,7 @@ public class ClientStateTest {
@Test
public void
shouldThrowIllegalStateExceptionIfAttemptingToInitializeNonEmptyPrevTaskSets() {
client.addPreviousActiveTasks(Collections.singleton(TASK_0_1));
- assertThrows(IllegalStateException.class, () ->
client.initializePrevTasks(Collections.emptyMap()));
+ assertThrows(IllegalStateException.class, () ->
client.initializePrevTasks(Collections.emptyMap(), false));
}
@Test