This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 0f3ebdac97b HOTFIX: avoid placement of unnecessary transient standby
tasks & improve assignor logging (#14149)
0f3ebdac97b is described below
commit 0f3ebdac97b5017ec42c19483e43b699f3a9a90f
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Wed Aug 30 13:29:38 2023 -0700
HOTFIX: avoid placement of unnecessary transient standby tasks & improve
assignor logging (#14149)
Minor fix to avoid creating unnecessary standby tasks, especially when
these may be surprising or unexpected as in the case of an application with
num.standby.replicas = 0 and warmup replicas disabled.
The "bug" here was introduced during the fix for an issue with cooperative
rebalancing and in-memory stores. The fundamental problem is that in-memory
stores cannot be unassigned from a consumer for any period, however temporary,
without being closed and losing all the accumulated state. This caused some
grief when the new HA task assignor would assign an active task to a node based
on the readiness of the standby version of that task, but would have to remove
the active task from the [...]
To fix this, we simply began to place standby tasks on the intended
recipient of an active task awaiting revocation by another consumer. However,
the fix was a bit of an overreach, as we assigned these temporary standby tasks
in all cases, regardless of whether there had previously been a standby version
of that task. We can narrow this down without sacrificing any of the intended
functionality by only assigning this kind of standby task where the consumer
had previously owned some ve [...]
Also breaks up some of the long log lines in the StreamsPartitionAssignor
and expands the summary info while moving it all to the front of the line
(following reports of missing info due to truncation of long log lines in
larger applications)
---
.../internals/StreamsPartitionAssignor.java | 52 +++++++++++--------
.../internals/assignment/ClientState.java | 6 ++-
.../internals/StreamsPartitionAssignorTest.java | 60 ++++++++++++++++++++++
3 files changed, 97 insertions(+), 21 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 330d46a1780..e232ef15df0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -625,18 +625,24 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
final boolean lagComputationSuccessful =
populateClientStatesMap(clientStates, clientMetadataMap,
taskForPartition, changelogTopics);
- log.info("{} members participating in this rebalance: \n{}.",
- clientStates.size(),
- clientStates.entrySet().stream()
- .sorted(comparingByKey())
- .map(entry -> entry.getKey() + ": " +
entry.getValue().consumers())
- .collect(Collectors.joining(Utils.NL)));
+
+ log.info("{} client nodes and {} consumers participating in this
rebalance: \n{}.",
+ clientStates.size(),
+
clientStates.values().stream().map(ClientState::capacity).reduce(Integer::sum),
+ clientStates.entrySet().stream()
+ .sorted(comparingByKey())
+ .map(entry -> entry.getKey() + ": " +
entry.getValue().consumers())
+ .collect(Collectors.joining(Utils.NL)));
final Set<TaskId> allTasks = partitionsForTask.keySet();
statefulTasks.addAll(changelogTopics.statefulTaskIds());
- log.debug("Assigning tasks {} including stateful {} to clients {} with
number of replicas {}",
- allTasks, statefulTasks, clientStates, numStandbyReplicas());
+ log.info("Assigning stateful tasks: {}\n"
+ + "and stateless tasks: {}",
+ statefulTasks,
+ allTasks.stream().filter(t -> !statefulTasks.contains(t)));
+ log.debug("Assigning tasks and {} standby replicas to client nodes {}",
+ numStandbyReplicas(), clientStates);
final TaskAssignor taskAssignor =
createTaskAssignor(lagComputationSuccessful);
@@ -656,15 +662,17 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
rackAwareTaskAssignor,
assignmentConfigs);
- log.info("{} assigned tasks {} including stateful {} to {} clients as:
\n{}.",
- allTasks.size(),
- allTasks,
- statefulTasks,
- clientStates.size(),
- clientStates.entrySet().stream()
- .sorted(comparingByKey())
- .map(entry -> entry.getKey() + "=" +
entry.getValue().currentAssignment())
- .collect(Collectors.joining(Utils.NL)));
+ // Break this up into multiple logs to make sure the summary info gets
through, which helps avoid
+ // info loss for example due to long line truncation with large apps
+ log.info("Assigned {} total tasks including {} stateful tasks to {}
client nodes.",
+ allTasks.size(),
+ statefulTasks.size(),
+ clientStates.size());
+ log.info("Assignment of tasks to nodes: {}",
+ clientStates.entrySet().stream()
+ .sorted(comparingByKey())
+ .map(entry -> entry.getKey() + "=" +
entry.getValue().currentAssignment())
+ .collect(Collectors.joining(Utils.NL)));
return probingRebalanceNeeded;
}
@@ -1081,9 +1089,13 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
}
for (final TaskId task : revokedTasks) {
- if (allStatefulTasks.contains(task)) {
- log.info("Adding removed stateful active task {} as a standby
for {} before it is revoked in followup rebalance",
- task, consumer);
+ // If this task is stateful and already owned by the consumer, but
can't (yet) be assigned as an active
+ // task during this rebalance as it must be revoked from another
consumer first, place a temporary
+ // standby task here until it can receive the active task to avoid
closing the state store (and losing
+ // all of the accumulated state in the case of in-memory stores)
+ if (clientState.previouslyOwnedStandby(task) &&
allStatefulTasks.contains(task)) {
+ log.info("Adding removed stateful active task {} as a standby
for {} until it is revoked and can "
+ + "be transitioned to active in a followup
rebalance", task, consumer);
// This has no effect on the assignment, as we'll never
consult the ClientState again, but
// it does perform a useful assertion that the it's legal to
assign this task as a standby to this instance
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index 71fc1c3ef00..a7fcd514cfc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -124,7 +124,7 @@ public class ClientState {
);
}
- int capacity() {
+ public int capacity() {
return capacity;
}
@@ -269,6 +269,10 @@ public class ClientState {
);
}
+ public boolean previouslyOwnedStandby(final TaskId task) {
+ return previousStandbyTasks.taskIds().contains(task);
+ }
+
public int assignedTaskCount() {
return activeTaskCount() + standbyTaskCount();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 36ec4bb3fe2..d4742cc6644 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -748,6 +748,66 @@ public class StreamsPartitionAssignorTest {
assertEquals(expectedInfo11TaskIds, info11.activeTasks());
}
+ @Test
+ public void shouldNotAssignTemporaryStandbyTask() {
+ builder.addSource(null, "source1", null, null, null, "topic1");
+
+ final List<PartitionInfo> localInfos = asList(
+ new PartitionInfo("topic1", 0, NODE_0, REPLICA_0, REPLICA_0),
+ new PartitionInfo("topic1", 1, NODE_1, REPLICA_1, REPLICA_1),
+ new PartitionInfo("topic1", 2, NODE_2, REPLICA_2, REPLICA_2),
+ new PartitionInfo("topic1", 3, NODE_0, REPLICA_1, REPLICA_2)
+ );
+
+ final Cluster localMetadata = new Cluster(
+ "cluster",
+ asList(NODE_0, NODE_1, NODE_2),
+ localInfos,
+ emptySet(),
+ emptySet()
+ );
+
+ final List<String> topics = singletonList("topic1");
+
+ createMockTaskManager(mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3),
emptySet());
+ configureDefaultPartitionAssignor();
+
+ subscriptions.put("consumer10",
+ new Subscription(
+ topics,
+ getInfo(UUID_1, mkSet(TASK_0_0, TASK_0_2),
emptySet()).encode(),
+ asList(t1p0, t1p2),
+ DEFAULT_GENERATION,
+ Optional.of(RACK_2)
+ ));
+ subscriptions.put("consumer11",
+ new Subscription(
+ topics,
+ getInfo(UUID_1, mkSet(TASK_0_1, TASK_0_3),
emptySet()).encode(),
+ asList(t1p1, t1p3),
+ DEFAULT_GENERATION,
+ Optional.of(RACK_2)
+ ));
+ subscriptions.put("consumer20",
+ new Subscription(
+ topics,
+ getInfo(UUID_2, emptySet(), emptySet()).encode(),
+ emptyList(),
+ DEFAULT_GENERATION,
+ Optional.of(RACK_2)
+ ));
+
+ final Map<String, Assignment> assignments =
partitionAssignor.assign(localMetadata, new
GroupSubscription(subscriptions)).groupAssignment();
+
+ // neither active nor standby tasks should be assigned to consumer 3,
which will have to wait until
+ // the followup cooperative rebalance to get the active task(s) it was
assigned (and does not need
+ // a standby copy before that since it previously had no tasks at all)
+ final AssignmentInfo info20 =
AssignmentInfo.decode(assignments.get("consumer20").userData());
+ assertTrue(info20.activeTasks().isEmpty());
+ assertTrue(info20.standbyTasks().isEmpty());
+
+ }
+
@Test
public void testAssignEmptyMetadata() {
builder.addSource(null, "source1", null, null, null, "topic1");