This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 15ad3016b7e KAFKA-19140 ConnectAssignor#performAssignment parameter
can be replace to ConnectProtocolCompatibility (#19476)
15ad3016b7e is described below
commit 15ad3016b7e8dcfd9c893b9274e5aee69ef5f7b6
Author: Ken Huang <[email protected]>
AuthorDate: Fri Jun 20 03:07:57 2025 +0800
KAFKA-19140 ConnectAssignor#performAssignment parameter can be replace to
ConnectProtocolCompatibility (#19476)
The protocol type; for Connect assignors this is "eager", "compatible",
or "sessioned"
Since `ConnectAssignor` is an interface and the protocol parameter is
restricted to "eager", "compatible", or "sessioned", it aligns with the
existing ConnectProtocolCompatibility enum. Therefore, we can update the
code to use `ConnectProtocolCompatibility` directly.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../apache/kafka/connect/runtime/distributed/ConnectAssignor.java | 4 ++--
.../apache/kafka/connect/runtime/distributed/EagerAssignor.java | 2 +-
.../runtime/distributed/IncrementalCooperativeAssignor.java | 4 ++--
.../kafka/connect/runtime/distributed/WorkerCoordinator.java | 7 ++++---
.../runtime/distributed/IncrementalCooperativeAssignorTest.java | 4 ++--
5 files changed, 11 insertions(+), 10 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java
index 1436460d1a9..d91e1fec85f 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java
@@ -32,12 +32,12 @@ public interface ConnectAssignor {
* method computes an assignment of connectors and tasks among the members
of the worker group.
*
* @param leaderId the leader of the group
- * @param protocol the protocol type; for Connect assignors this is
"eager", "compatible", or "sessioned"
+ * @param protocol the protocol type
* @param allMemberMetadata the metadata of all the active workers of the
group
* @param coordinator the worker coordinator that runs this assignor
* @return the assignment of connectors and tasks to workers
*/
- Map<String, ByteBuffer> performAssignment(String leaderId, String protocol,
+ Map<String, ByteBuffer> performAssignment(String leaderId,
ConnectProtocolCompatibility protocol,
List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata,
WorkerCoordinator coordinator);
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
index 0663d9e5710..2d8dba5d758 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
@@ -52,7 +52,7 @@ public class EagerAssignor implements ConnectAssignor {
}
@Override
- public Map<String, ByteBuffer> performAssignment(String leaderId, String
protocol,
+ public Map<String, ByteBuffer> performAssignment(String leaderId,
ConnectProtocolCompatibility protocol,
List<JoinGroupResponseMember> allMemberMetadata,
WorkerCoordinator
coordinator) {
log.debug("Performing task assignment");
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index 6be247cfc82..e5bd9097033 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -97,7 +97,7 @@ public class IncrementalCooperativeAssignor implements
ConnectAssignor {
}
@Override
- public Map<String, ByteBuffer> performAssignment(String leaderId, String
protocol,
+ public Map<String, ByteBuffer> performAssignment(String leaderId,
ConnectProtocolCompatibility protocol,
List<JoinGroupResponseMember> allMemberMetadata,
WorkerCoordinator
coordinator) {
log.debug("Performing task assignment");
@@ -117,7 +117,7 @@ public class IncrementalCooperativeAssignor implements
ConnectAssignor {
log.debug("Max config offset root: {}, local snapshot config offsets
root: {}",
maxOffset, coordinator.configSnapshot().offset());
- short protocolVersion =
ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion();
+ short protocolVersion = protocol.protocolVersion();
Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator);
if (leaderOffset == null) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 99daf19d1d9..871fe3b33e4 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -229,9 +229,10 @@ public class WorkerCoordinator extends AbstractCoordinator
implements Closeable
if (skipAssignment)
throw new IllegalStateException("Can't skip assignment because
Connect does not support static membership.");
- return ConnectProtocolCompatibility.fromProtocol(protocol) == EAGER
- ? eagerAssignor.performAssignment(leaderId, protocol,
allMemberMetadata, this)
- : incrementalAssignor.performAssignment(leaderId, protocol,
allMemberMetadata, this);
+ ConnectProtocolCompatibility protocolCompatibility =
ConnectProtocolCompatibility.fromProtocol(protocol);
+ return protocolCompatibility == EAGER
+ ? eagerAssignor.performAssignment(leaderId,
protocolCompatibility, allMemberMetadata, this)
+ : incrementalAssignor.performAssignment(leaderId,
protocolCompatibility, allMemberMetadata, this);
}
@Override
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index 8e10a07a015..26ee2331b50 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -1236,7 +1236,7 @@ public class IncrementalCooperativeAssignorTest {
when(coordinator.configSnapshot()).thenReturn(configState());
Map<String, ByteBuffer> serializedAssignments =
assignor.performAssignment(
leader,
- ConnectProtocolCompatibility.COMPATIBLE.protocol(),
+ ConnectProtocolCompatibility.COMPATIBLE,
memberMetadata,
coordinator
);
@@ -1277,7 +1277,7 @@ public class IncrementalCooperativeAssignorTest {
when(coordinator.configSnapshot()).thenReturn(configState());
Map<String, ByteBuffer> serializedAssignments =
assignor.performAssignment(
leader,
- ConnectProtocolCompatibility.SESSIONED.protocol(),
+ ConnectProtocolCompatibility.SESSIONED,
memberMetadata,
coordinator
);