This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 15ad3016b7e KAFKA-19140 ConnectAssignor#performAssignment parameter 
can be replace to ConnectProtocolCompatibility (#19476)
15ad3016b7e is described below

commit 15ad3016b7e8dcfd9c893b9274e5aee69ef5f7b6
Author: Ken Huang <[email protected]>
AuthorDate: Fri Jun 20 03:07:57 2025 +0800

    KAFKA-19140 ConnectAssignor#performAssignment parameter can be replace to 
ConnectProtocolCompatibility (#19476)
    
    The protocol type; for Connect assignors this is "eager", "compatible",
    or "sessioned"
    
    Since `ConnectAssignor` is an interface and the protocol parameter is
    restricted to "eager", "compatible", or "sessioned", it aligns with the
    existing ConnectProtocolCompatibility enum. Therefore, we can update the
    code to use `ConnectProtocolCompatibility` directly.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../apache/kafka/connect/runtime/distributed/ConnectAssignor.java  | 4 ++--
 .../apache/kafka/connect/runtime/distributed/EagerAssignor.java    | 2 +-
 .../runtime/distributed/IncrementalCooperativeAssignor.java        | 4 ++--
 .../kafka/connect/runtime/distributed/WorkerCoordinator.java       | 7 ++++---
 .../runtime/distributed/IncrementalCooperativeAssignorTest.java    | 4 ++--
 5 files changed, 11 insertions(+), 10 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java
index 1436460d1a9..d91e1fec85f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java
@@ -32,12 +32,12 @@ public interface ConnectAssignor {
      * method computes an assignment of connectors and tasks among the members 
of the worker group.
      *
      * @param leaderId the leader of the group
-     * @param protocol the protocol type; for Connect assignors this is 
"eager", "compatible", or "sessioned"
+     * @param protocol the protocol type
      * @param allMemberMetadata the metadata of all the active workers of the 
group
      * @param coordinator the worker coordinator that runs this assignor
      * @return the assignment of connectors and tasks to workers
      */
-    Map<String, ByteBuffer> performAssignment(String leaderId, String protocol,
+    Map<String, ByteBuffer> performAssignment(String leaderId, 
ConnectProtocolCompatibility protocol,
                                               
List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata,
                                               WorkerCoordinator coordinator);
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
index 0663d9e5710..2d8dba5d758 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
@@ -52,7 +52,7 @@ public class EagerAssignor implements ConnectAssignor {
     }
 
     @Override
-    public Map<String, ByteBuffer> performAssignment(String leaderId, String 
protocol,
+    public Map<String, ByteBuffer> performAssignment(String leaderId, 
ConnectProtocolCompatibility protocol,
                                                      
List<JoinGroupResponseMember> allMemberMetadata,
                                                      WorkerCoordinator 
coordinator) {
         log.debug("Performing task assignment");
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index 6be247cfc82..e5bd9097033 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -97,7 +97,7 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
     }
 
     @Override
-    public Map<String, ByteBuffer> performAssignment(String leaderId, String 
protocol,
+    public Map<String, ByteBuffer> performAssignment(String leaderId, 
ConnectProtocolCompatibility protocol,
                                                      
List<JoinGroupResponseMember> allMemberMetadata,
                                                      WorkerCoordinator 
coordinator) {
         log.debug("Performing task assignment");
@@ -117,7 +117,7 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
         log.debug("Max config offset root: {}, local snapshot config offsets 
root: {}",
                   maxOffset, coordinator.configSnapshot().offset());
 
-        short protocolVersion = 
ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion();
+        short protocolVersion = protocol.protocolVersion();
 
         Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator);
         if (leaderOffset == null) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 99daf19d1d9..871fe3b33e4 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -229,9 +229,10 @@ public class WorkerCoordinator extends AbstractCoordinator 
implements Closeable
         if (skipAssignment)
             throw new IllegalStateException("Can't skip assignment because 
Connect does not support static membership.");
 
-        return ConnectProtocolCompatibility.fromProtocol(protocol) == EAGER
-               ? eagerAssignor.performAssignment(leaderId, protocol, 
allMemberMetadata, this)
-               : incrementalAssignor.performAssignment(leaderId, protocol, 
allMemberMetadata, this);
+        ConnectProtocolCompatibility protocolCompatibility = 
ConnectProtocolCompatibility.fromProtocol(protocol);
+        return protocolCompatibility == EAGER
+               ? eagerAssignor.performAssignment(leaderId, 
protocolCompatibility, allMemberMetadata, this)
+               : incrementalAssignor.performAssignment(leaderId, 
protocolCompatibility, allMemberMetadata, this);
     }
 
     @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index 8e10a07a015..26ee2331b50 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -1236,7 +1236,7 @@ public class IncrementalCooperativeAssignorTest {
         when(coordinator.configSnapshot()).thenReturn(configState());
         Map<String, ByteBuffer> serializedAssignments = 
assignor.performAssignment(
                 leader,
-                ConnectProtocolCompatibility.COMPATIBLE.protocol(),
+                ConnectProtocolCompatibility.COMPATIBLE,
                 memberMetadata,
                 coordinator
         );
@@ -1277,7 +1277,7 @@ public class IncrementalCooperativeAssignorTest {
         when(coordinator.configSnapshot()).thenReturn(configState());
         Map<String, ByteBuffer> serializedAssignments = 
assignor.performAssignment(
                 leader,
-                ConnectProtocolCompatibility.SESSIONED.protocol(),
+                ConnectProtocolCompatibility.SESSIONED,
                 memberMetadata,
                 coordinator
         );

Reply via email to