This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/kip1071 by this push:
new 6f6cbef487e Follow-ups to the integration of Streams membership
manager (#18019)
6f6cbef487e is described below
commit 6f6cbef487e94d67db7ed941799b42a6bfd1ba75
Author: Bruno Cadonna <[email protected]>
AuthorDate: Tue Dec 3 16:51:32 2024 +0100
Follow-ups to the integration of Streams membership manager (#18019)
---
.../internals/StreamsAssignmentInterface.java | 25 +++++------
.../events/ApplicationEventProcessor.java | 6 +--
.../StreamsGroupHeartbeatRequestManagerTest.java | 25 ++---------
.../streams/processor/internals/StreamThread.java | 48 ++++++++++++----------
.../processor/internals/StreamThreadTest.java | 20 ++++-----
5 files changed, 53 insertions(+), 71 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
index 8d5754b7c9c..821b42cca08 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
@@ -304,9 +304,9 @@ public class StreamsAssignmentInterface {
private ApplicationEventHandler applicationEventHandler = null;
- private Optional<Function<Set<StreamsAssignmentInterface.TaskId>,
Optional<Exception>>> onTasksRevokedCallback = null;
- private Optional<Function<Assignment, Optional<Exception>>>
onTasksAssignedCallback = null;
- private Optional<Supplier<Optional<Exception>>> onAllTasksLostCallback =
null;
+ private Optional<Function<Set<StreamsAssignmentInterface.TaskId>,
Optional<Exception>>> onTasksRevokedCallback = Optional.empty();
+ private Optional<Function<Assignment, Optional<Exception>>>
onTasksAssignedCallback = Optional.empty();
+ private Optional<Supplier<Optional<Exception>>> onAllTasksLostCallback =
Optional.empty();
private final StreamsRebalanceEventProcessor
streamsRebalanceEventProcessor;
@@ -316,19 +316,18 @@ public class StreamsAssignmentInterface {
public void process(final BackgroundEvent event) {
switch (event.type()) {
case ERROR:
- process((ErrorEvent) event);
- break;
+ throw ((ErrorEvent) event).error();
case STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED:
- process((StreamsOnTasksRevokedCallbackNeededEvent) event);
+
processStreamsOnTasksRevokedCallbackNeededEvent((StreamsOnTasksRevokedCallbackNeededEvent)
event);
break;
case STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED:
- process((StreamsOnTasksAssignedCallbackNeededEvent) event);
+
processStreamsOnTasksAssignedCallbackNeededEvent((StreamsOnTasksAssignedCallbackNeededEvent)
event);
break;
case STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED:
- process((StreamsOnAllTasksLostCallbackNeededEvent) event);
+
processStreamsOnAllTasksLostCallbackNeededEvent((StreamsOnAllTasksLostCallbackNeededEvent)
event);
break;
default:
@@ -337,11 +336,7 @@ public class StreamsAssignmentInterface {
}
}
- private void process(final ErrorEvent event) {
- throw event.error();
- }
-
- private void process(final StreamsOnTasksRevokedCallbackNeededEvent
event) {
+ private void processStreamsOnTasksRevokedCallbackNeededEvent(final
StreamsOnTasksRevokedCallbackNeededEvent event) {
StreamsOnTasksRevokedCallbackCompletedEvent invokedEvent =
invokeOnTasksRevokedCallback(event.activeTasksToRevoke(), event.future());
applicationEventHandler.add(invokedEvent);
if (invokedEvent.error().isPresent()) {
@@ -349,7 +344,7 @@ public class StreamsAssignmentInterface {
}
}
- private void process(final StreamsOnTasksAssignedCallbackNeededEvent
event) {
+ private void processStreamsOnTasksAssignedCallbackNeededEvent(final
StreamsOnTasksAssignedCallbackNeededEvent event) {
StreamsOnTasksAssignedCallbackCompletedEvent invokedEvent =
invokeOnTasksAssignedCallback(event.assignment(), event.future());
applicationEventHandler.add(invokedEvent);
if (invokedEvent.error().isPresent()) {
@@ -357,7 +352,7 @@ public class StreamsAssignmentInterface {
}
}
- private void process(final StreamsOnAllTasksLostCallbackNeededEvent
event) {
+ private void processStreamsOnAllTasksLostCallbackNeededEvent(final
StreamsOnAllTasksLostCallbackNeededEvent event) {
StreamsOnAllTasksLostCallbackCompletedEvent invokedEvent =
invokeOnAllTasksLostCallback(event.future());
applicationEventHandler.add(invokedEvent);
if (invokedEvent.error().isPresent()) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index f36852c091a..e2cf2526d38 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -572,7 +572,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
}
private void process(final StreamsOnTasksRevokedCallbackCompletedEvent
event) {
- if (!requestManagers.streamsMembershipManager.isPresent()) {
+ if (requestManagers.streamsMembershipManager.isEmpty()) {
log.warn("An internal error occurred; the Streams membership
manager was not present, so the notification " +
"of the onTasksRevoked callback execution could not be sent");
return;
@@ -581,7 +581,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
}
private void process(final StreamsOnTasksAssignedCallbackCompletedEvent
event) {
- if (!requestManagers.streamsMembershipManager.isPresent()) {
+ if (requestManagers.streamsMembershipManager.isEmpty()) {
log.warn("An internal error occurred; the Streams membership
manager was not present, so the notification " +
"of the onTasksAssigned callback execution could not be sent");
return;
@@ -590,7 +590,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
}
private void process(final StreamsOnAllTasksLostCallbackCompletedEvent
event) {
- if (!requestManagers.streamsMembershipManager.isPresent()) {
+ if (requestManagers.streamsMembershipManager.isEmpty()) {
log.warn("An internal error occurred; the Streams membership
manager was not present, so the notification " +
"of the onAllTasksLost callback execution could not be sent");
return;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index 4f22ed64c3b..6006bcb286a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -349,28 +349,9 @@ class StreamsGroupHeartbeatRequestManagerTest {
assertEquals(TEST_MEMBER_EPOCH, response.memberEpoch());
assertEquals(TEST_THROTTLE_TIME_MS, response.throttleTimeMs());
assertEquals(1000, response.heartbeatIntervalMs());
-// final List<TopicPartitions> tps = response.assign.topicPartitions();
-// assertEquals(2, tps.size());
-// assertEquals(Set.of(uuid0, uuid1),
tps.stream().map(TopicPartitions::topicId).collect(Collectors.toSet()));
-// assertEquals(Collections.singletonList(0), tps.get(0).partitions());
-// assertEquals(Collections.singletonList(0), tps.get(1).partitions());
-
-// final Assignment targetAssignment =
streamsAssignmentInterface.targetAssignment.get();
-// assertEquals(1, targetAssignment.activeTasks.size());
-// final TaskId activeTaskId =
targetAssignment.activeTasks.stream().findFirst().get();
-// assertEquals(activeTaskId.subtopologyId(), "0");
-// assertEquals(activeTaskId.partitionId(), 0);
-//
-// assertEquals(1, targetAssignment.standbyTasks.size());
-// final TaskId standbyTaskId =
targetAssignment.standbyTasks.stream().findFirst().get();
-// assertEquals(standbyTaskId.subtopologyId(), "1");
-// assertEquals(standbyTaskId.partitionId(), 1);
-//
-// assertEquals(1, targetAssignment.warmupTasks.size());
-// final TaskId warmupTaskId =
targetAssignment.warmupTasks.stream().findFirst().get();
-// assertEquals(warmupTaskId.subtopologyId(), "2");
-// assertEquals(warmupTaskId.partitionId(), 2);
-
+ assertEquals(data.activeTasks(), response.activeTasks());
+ assertEquals(data.standbyTasks(), response.standbyTasks());
+ assertEquals(data.warmupTasks(), response.warmupTasks());
}
private void mockResponse(final StreamsGroupHeartbeatResponseData data) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e5208bdea97..721eb5aabc7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -352,7 +352,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
private final Queue<StreamsException> nonFatalExceptionsToHandle;
// These are used only with the Streams Rebalance Protocol client
- private final StreamsAssignmentInterface streamsAssignmentInterface;
+ private final Optional<StreamsAssignmentInterface>
streamsAssignmentInterface;
private final StreamsMetadataState streamsMetadataState;
// These are used to signal from outside the stream thread, but the
variables themselves are internal to the thread
@@ -491,23 +491,28 @@ public class StreamThread extends Thread implements
ProcessingThread {
}
final Consumer<byte[], byte[]> mainConsumer;
- final StreamsAssignmentInterface streamsAssignmentInterface;
+ final Optional<StreamsAssignmentInterface> streamsAssignmentInterface;
if
(config.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name))
{
if (topologyMetadata.hasNamedTopologies()) {
throw new IllegalStateException("Named topologies and the
CONSUMER protocol cannot be used at the same time.");
}
log.info("Streams rebalance protocol enabled");
- streamsAssignmentInterface =
- initAssignmentInterface(processId,
- config,
-
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)),
- topologyMetadata);
-
- mainConsumer =
clientSupplier.getStreamsRebalanceProtocolConsumer(consumerConfigs,
streamsAssignmentInterface);
+ streamsAssignmentInterface = Optional.of(
+ initAssignmentInterface(
+ processId,
+ config,
+
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)),
+ topologyMetadata
+ )
+ );
+ mainConsumer = clientSupplier.getStreamsRebalanceProtocolConsumer(
+ consumerConfigs,
+ streamsAssignmentInterface.get()
+ );
} else {
mainConsumer = clientSupplier.getConsumer(consumerConfigs);
- streamsAssignmentInterface = null;
+ streamsAssignmentInterface = Optional.empty();
}
taskManager.setMainConsumer(mainConsumer);
@@ -693,7 +698,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
final Runnable shutdownErrorHook,
final BiConsumer<Throwable, Boolean>
streamsUncaughtExceptionHandler,
final java.util.function.Consumer<Long> cacheResizer,
- final StreamsAssignmentInterface
streamsAssignmentInterface,
+ final Optional<StreamsAssignmentInterface>
streamsAssignmentInterface,
final StreamsMetadataState streamsMetadataState) {
super(threadId);
this.stateLock = new Object();
@@ -716,11 +721,11 @@ public class StreamThread extends Thread implements
ProcessingThread {
this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
this.cacheResizer = cacheResizer;
this.streamsAssignmentInterface = streamsAssignmentInterface;
- if (streamsAssignmentInterface != null) {
-
streamsAssignmentInterface.setOnTasksRevokedCallback(this::onTasksRevoked);
-
streamsAssignmentInterface.setOnTasksAssignedCallback(this::onTasksAssigned);
-
streamsAssignmentInterface.setOnAllTasksLostCallback(this::onAllTasksLost);
- }
+ streamsAssignmentInterface.ifPresent(assignmentInterface -> {
+
assignmentInterface.setOnTasksRevokedCallback(this::onTasksRevoked);
+
assignmentInterface.setOnTasksAssignedCallback(this::onTasksAssigned);
+
assignmentInterface.setOnAllTasksLostCallback(this::onAllTasksLost);
+ });
this.streamsMetadataState = streamsMetadataState;
// The following sensors are created here but their references are not
stored in this object, since within
@@ -1391,14 +1396,15 @@ public class StreamThread extends Thread implements
ProcessingThread {
}
public void maybeHandleAssignmentFromStreamsRebalanceProtocol() {
- if (streamsAssignmentInterface != null) {
+ if (streamsAssignmentInterface.isPresent()) {
- if (streamsAssignmentInterface.shutdownRequested()) {
+ if (streamsAssignmentInterface.get().shutdownRequested()) {
assignmentErrorCode.set(AssignorError.SHUTDOWN_REQUESTED.code());
}
// Process metadata from Streams Rebalance Protocol
- final Map<StreamsAssignmentInterface.HostInfo,
List<TopicPartition>> partitionsByEndpoint =
streamsAssignmentInterface.partitionsByHost.get();
+ final Map<StreamsAssignmentInterface.HostInfo,
List<TopicPartition>> partitionsByEndpoint =
+ streamsAssignmentInterface.get().partitionsByHost.get();
final Map<HostInfo, Set<TopicPartition>> convertedHostInfoMap =
new HashMap<>();
partitionsByEndpoint.forEach((hostInfo, topicPartitions) ->
convertedHostInfoMap.put(new HostInfo(hostInfo.host,
hostInfo.port), new HashSet<>(topicPartitions)));
@@ -1409,7 +1415,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
);
// Process assignment from Streams Rebalance Protocol
- streamsAssignmentInterface.processStreamsRebalanceEvents();
+ streamsAssignmentInterface.get().processStreamsRebalanceEvents();
}
}
@@ -1488,7 +1494,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
return taskIdStream
.collect(Collectors.toMap(
this::toTaskId,
- task -> toTopicPartitions(task,
streamsAssignmentInterface.subtopologyMap().get(task.subtopologyId()))
+ task -> toTopicPartitions(task,
streamsAssignmentInterface.get().subtopologyMap().get(task.subtopologyId()))
));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 9efb5780732..b71fe0af407 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1448,7 +1448,7 @@ public class StreamThreadTest {
null,
HANDLER,
null,
- null,
+ Optional.empty(),
null).updateThreadMetadata(adminClientId(CLIENT_ID));
final StreamsException thrown = assertThrows(StreamsException.class,
thread::run);
@@ -2675,7 +2675,7 @@ public class StreamThreadTest {
null,
HANDLER,
null,
- null,
+ Optional.empty(),
null) {
@Override
void runOnceWithProcessingThreads() {
@@ -2734,7 +2734,7 @@ public class StreamThreadTest {
null,
HANDLER,
null,
- null,
+ Optional.empty(),
null) {
@Override
void runOnceWithProcessingThreads() {
@@ -2802,7 +2802,7 @@ public class StreamThreadTest {
null,
HANDLER,
null,
- null,
+ Optional.empty(),
null) {
@Override
void runOnceWithProcessingThreads() {
@@ -2866,7 +2866,7 @@ public class StreamThreadTest {
null,
HANDLER,
null,
- null,
+ Optional.empty(),
null) {
@Override
void runOnceWithProcessingThreads() {
@@ -2927,7 +2927,7 @@ public class StreamThreadTest {
null,
HANDLER,
null,
- null,
+ Optional.empty(),
null) {
@Override
void runOnceWithProcessingThreads() {
@@ -3161,7 +3161,7 @@ public class StreamThreadTest {
null,
HANDLER,
null,
- null,
+ Optional.empty(),
null);
final MetricName testMetricName = new MetricName("test_metric", "",
"", new HashMap<>());
final Metric testMetric = new KafkaMetric(
@@ -3218,7 +3218,7 @@ public class StreamThreadTest {
null,
(e, b) -> { },
null,
- null,
+ Optional.empty(),
null) {
@Override
void runOnceWithProcessingThreads() {
@@ -3598,7 +3598,7 @@ public class StreamThreadTest {
null,
null,
null,
- null,
+ Optional.empty(),
null);
}
@@ -3720,7 +3720,7 @@ public class StreamThreadTest {
null,
HANDLER,
null,
- null,
+ Optional.empty(),
null);
}