This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 103ff5c0f06 KAFKA-15045: (KIP-924 pt. 24) internal TaskAssignor rename 
to LegacyTaskAssignor (#16318)
103ff5c0f06 is described below

commit 103ff5c0f06a35c725c87d534ad591e9e95534ff
Author: Antoine Pourchet <[email protected]>
AuthorDate: Thu Jun 13 01:32:39 2024 -0600

    KAFKA-15045: (KIP-924 pt. 24) internal TaskAssignor rename to 
LegacyTaskAssignor (#16318)
    
    Since the new public API for TaskAssignor shared a name, this rename will 
prevent users from confusing the internal definition with the public one.
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../processor/internals/StreamsPartitionAssignor.java   | 12 ++++++------
 .../internals/assignment/AssignorConfiguration.java     |  4 ++--
 .../internals/assignment/FallbackPriorTaskAssignor.java |  2 +-
 .../assignment/HighAvailabilityTaskAssignor.java        |  2 +-
 .../{TaskAssignor.java => LegacyTaskAssignor.java}      |  2 +-
 .../internals/assignment/StandbyTaskAssignor.java       |  2 +-
 .../internals/assignment/StickyTaskAssignor.java        |  2 +-
 .../integration/TaskAssignorIntegrationTest.java        | 17 +++++++++--------
 .../processor/internals/StreamsAssignmentScaleTest.java |  4 ++--
 .../internals/StreamsPartitionAssignorTest.java         |  6 +++---
 10 files changed, 27 insertions(+), 26 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 887ef86faf5..e0ddd088377 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -64,7 +64,7 @@ import 
org.apache.kafka.streams.processor.internals.assignment.RackUtils;
 import 
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
 import 
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
 import 
org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
 import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskInfo;
 import org.apache.kafka.streams.state.HostInfo;
 import org.slf4j.Logger;
@@ -224,7 +224,7 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
 
     private 
Supplier<Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor>>
         customTaskAssignorSupplier;
-    private Supplier<TaskAssignor> internalTaskAssignorSupplier;
+    private Supplier<LegacyTaskAssignor> legacyTaskAssignorSupplier;
     private byte uniqueField;
     private Map<String, String> clientTags;
 
@@ -259,7 +259,7 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         copartitionedTopicsEnforcer = 
assignorConfiguration.copartitionedTopicsEnforcer();
         rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
         customTaskAssignorSupplier = assignorConfiguration::customTaskAssignor;
-        internalTaskAssignorSupplier = assignorConfiguration::taskAssignor;
+        legacyTaskAssignorSupplier = assignorConfiguration::taskAssignor;
         assignmentListener = assignorConfiguration.assignmentListener();
         uniqueField = 0;
         clientTags = referenceContainer.clientTags;
@@ -817,7 +817,7 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
             };
         } else {
             customTaskAssignmentListener = (assignment, subscription) -> { };
-            final TaskAssignor taskAssignor = 
createTaskAssignor(lagComputationSuccessful);
+            final LegacyTaskAssignor taskAssignor = 
createTaskAssignor(lagComputationSuccessful);
             final RackAwareTaskAssignor rackAwareTaskAssignor = new 
RackAwareTaskAssignor(
                 fullMetadata,
                 partitionsForTask,
@@ -859,8 +859,8 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         return customTaskAssignmentListener;
     }
 
-    private TaskAssignor createTaskAssignor(final boolean 
lagComputationSuccessful) {
-        final TaskAssignor taskAssignor = internalTaskAssignorSupplier.get();
+    private LegacyTaskAssignor createTaskAssignor(final boolean 
lagComputationSuccessful) {
+        final LegacyTaskAssignor taskAssignor = 
legacyTaskAssignorSupplier.get();
         if (taskAssignor instanceof StickyTaskAssignor) {
             // special case: to preserve pre-existing behavior, we invoke the 
StickyTaskAssignor
             // whether or not lag computation failed.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index 1971642942a..311863630af 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -242,9 +242,9 @@ public final class AssignorConfiguration {
         return AssignmentConfigs.of(streamsConfig);
     }
 
-    public TaskAssignor taskAssignor() {
+    public LegacyTaskAssignor taskAssignor() {
         try {
-            return Utils.newInstance(internalTaskAssignorClass, 
TaskAssignor.class);
+            return Utils.newInstance(internalTaskAssignorClass, 
LegacyTaskAssignor.class);
         } catch (final ClassNotFoundException e) {
             throw new IllegalArgumentException(
                 "Expected an instantiable class name for " + 
INTERNAL_TASK_ASSIGNOR_CLASS,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
index 93dcd6192b6..d7a07c6e200 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
@@ -31,7 +31,7 @@ import 
org.apache.kafka.streams.processor.assignment.ProcessId;
  * 1. ignore the task lags in the ClientState map
  * 2. always return true, indicating that a follow-up rebalance is needed
  */
-public class FallbackPriorTaskAssignor implements TaskAssignor {
+public class FallbackPriorTaskAssignor implements LegacyTaskAssignor {
     private final StickyTaskAssignor delegate;
 
     public FallbackPriorTaskAssignor() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
index bda199ea335..272cd824b43 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
@@ -43,7 +43,7 @@ import static 
org.apache.kafka.streams.processor.internals.assignment.RackAwareT
 import static 
org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignActiveTaskMovements;
 import static 
org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignStandbyTaskMovements;
 
-public class HighAvailabilityTaskAssignor implements TaskAssignor {
+public class HighAvailabilityTaskAssignor implements LegacyTaskAssignor {
     private static final Logger log = 
LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class);
     public static final int DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST = 10;
     public static final int DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST = 1;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/LegacyTaskAssignor.java
similarity index 97%
rename from 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
rename to 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/LegacyTaskAssignor.java
index e01c13b59da..9a7485b64aa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/LegacyTaskAssignor.java
@@ -23,7 +23,7 @@ import java.util.Set;
 import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
 import org.apache.kafka.streams.processor.assignment.ProcessId;
 
-public interface TaskAssignor {
+public interface LegacyTaskAssignor {
     /**
      * @return whether the generated assignment requires a followup probing 
rebalance to satisfy all conditions
      */
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java
index 022eaae078c..4787ac65523 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
 import org.apache.kafka.streams.processor.assignment.ProcessId;
 
-interface StandbyTaskAssignor extends TaskAssignor {
+interface StandbyTaskAssignor extends LegacyTaskAssignor {
     default boolean isAllowedTaskMovement(final ClientState source, final 
ClientState destination) {
         return true;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index b6953613d89..94c5dfce184 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -40,7 +40,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
-public class StickyTaskAssignor implements TaskAssignor {
+public class StickyTaskAssignor implements LegacyTaskAssignor {
 
     private static final Logger log = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
index 9d7d298f831..3e29773a151 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
@@ -30,7 +30,7 @@ import 
org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
 import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
 import 
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.AfterClass;
@@ -78,7 +78,8 @@ public class TaskAssignorIntegrationTest {
     public TestName testName = new TestName();
 
     // Just a dummy implementation so we can check the config
-    public static final class MyTaskAssignor extends 
HighAvailabilityTaskAssignor implements TaskAssignor { }
+    public static final class MyLegacyTaskAssignor extends 
HighAvailabilityTaskAssignor implements
+        LegacyTaskAssignor { }
 
     @SuppressWarnings("unchecked")
     @Test
@@ -116,7 +117,7 @@ public class TaskAssignorIntegrationTest {
                 mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
                 mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 
"480000"),
                 mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, 
configuredAssignmentListener),
-                
mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, 
MyTaskAssignor.class.getName())
+                
mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, 
MyLegacyTaskAssignor.class.getName())
             )
         );
 
@@ -153,18 +154,18 @@ public class TaskAssignorIntegrationTest {
             assignmentListenerField.setAccessible(true);
             final AssignmentListener actualAssignmentListener = 
(AssignmentListener) assignmentListenerField.get(streamsPartitionAssignor);
 
-            final Field taskAssignorSupplierField = 
StreamsPartitionAssignor.class.getDeclaredField("internalTaskAssignorSupplier");
+            final Field taskAssignorSupplierField = 
StreamsPartitionAssignor.class.getDeclaredField("legacyTaskAssignorSupplier");
             taskAssignorSupplierField.setAccessible(true);
-            final Supplier<TaskAssignor> taskAssignorSupplier =
-                (Supplier<TaskAssignor>) 
taskAssignorSupplierField.get(streamsPartitionAssignor);
-            final TaskAssignor taskAssignor = taskAssignorSupplier.get();
+            final Supplier<LegacyTaskAssignor> taskAssignorSupplier =
+                (Supplier<LegacyTaskAssignor>) 
taskAssignorSupplierField.get(streamsPartitionAssignor);
+            final LegacyTaskAssignor taskAssignor = taskAssignorSupplier.get();
 
             assertThat(configs.numStandbyReplicas(), is(5));
             assertThat(configs.acceptableRecoveryLag(), is(6L));
             assertThat(configs.maxWarmupReplicas(), is(7));
             assertThat(configs.probingRebalanceIntervalMs(), is(480000L));
             assertThat(actualAssignmentListener, 
sameInstance(configuredAssignmentListener));
-            assertThat(taskAssignor, instanceOf(MyTaskAssignor.class));
+            assertThat(taskAssignor, instanceOf(MyLegacyTaskAssignor.class));
         }
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
index b3c7131341b..c782eb3dbf6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
@@ -36,7 +36,7 @@ import 
org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTask
 import 
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
 import 
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
 import 
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockClientSupplier;
@@ -150,7 +150,7 @@ public class StreamsAssignmentScaleTest {
                                          final int numClients,
                                          final int numThreadsPerClient,
                                          final int numStandbys,
-                                         final Class<? extends TaskAssignor> 
taskAssignor) {
+                                         final Class<? extends 
LegacyTaskAssignor> taskAssignor) {
         final List<String> topic = singletonList("topic");
 
         final Map<TopicPartition, Long> changelogEndOffsets = new HashMap<>();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 43c07ee3c28..ac5b3180081 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -70,7 +70,7 @@ import 
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityT
 import 
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
 import 
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
 import 
org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockClientSupplier;
@@ -237,7 +237,7 @@ public class StreamsPartitionAssignorTest {
     @Captor
     private ArgumentCaptor<Map<TopicPartition, PartitionInfo>> 
topicPartitionInfoCaptor;
     private final Map<String, Subscription> subscriptions = new HashMap<>();
-    private final Class<? extends TaskAssignor> internalTaskAssignor;
+    private final Class<? extends LegacyTaskAssignor> internalTaskAssignor;
     private final Class<? extends 
org.apache.kafka.streams.processor.assignment.TaskAssignor> customTaskAssignor;
     private final String rackAwareAssignorStrategy;
     private Map<String, String> clientTags;
@@ -354,7 +354,7 @@ public class StreamsPartitionAssignorTest {
         );
     }
 
-    public StreamsPartitionAssignorTest(final Class<? extends TaskAssignor> 
internalTaskAssignor,
+    public StreamsPartitionAssignorTest(final Class<? extends 
LegacyTaskAssignor> internalTaskAssignor,
                                         final boolean enableRackAwareAssignor,
                                         final Class<? extends 
org.apache.kafka.streams.processor.assignment.TaskAssignor> customTaskAssignor) 
{
         this.internalTaskAssignor = internalTaskAssignor;

Reply via email to