This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 562b8006f97 KAFKA-15045: (KIP-924 pt. 25) Rename old internal 
StickyTaskAssignor to LegacyStickyTaskAssignor (#16322)
562b8006f97 is described below

commit 562b8006f97d6fcd3327a3699337921e6a8f8725
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Jun 13 11:27:50 2024 -0700

    KAFKA-15045: (KIP-924 pt. 25) Rename old internal StickyTaskAssignor to 
LegacyStickyTaskAssignor (#16322)
    
    To avoid confusion in 3.8/until we fully remove all the old task assignors 
and internal config, we should rename the old internal assignor classes like 
the StickyTaskAssignor so that they won't be mixed up with the new version of 
the assignor (which is also named StickyTaskAssignor)
    
    Reviewers: Bruno Cadonna <[email protected]>, Josep Prat 
<[email protected]>
---
 .../processor/internals/StreamsPartitionAssignor.java      |  6 +++---
 .../internals/assignment/FallbackPriorTaskAssignor.java    |  4 ++--
 ...ickyTaskAssignor.java => LegacyStickyTaskAssignor.java} | 13 +++++++++----
 .../java/org/apache/kafka/streams/StreamsConfigTest.java   |  4 ++--
 .../processor/internals/StreamsAssignmentScaleTest.java    | 12 ++++++------
 .../processor/internals/StreamsPartitionAssignorTest.java  |  6 +++---
 .../kafka/streams/processor/internals/TaskSuite.java       |  4 ++--
 ...AssignorTest.java => LegacyStickyTaskAssignorTest.java} | 14 +++++++-------
 .../tests/streams/streams_broker_down_resilience_test.py   |  4 ++--
 .../tests/streams/streams_standby_replica_test.py          |  2 +-
 10 files changed, 37 insertions(+), 32 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index e0ddd088377..4a348e194d1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -59,10 +59,10 @@ import 
org.apache.kafka.streams.processor.internals.assignment.ClientState;
 import 
org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
 import 
org.apache.kafka.streams.processor.internals.assignment.DefaultTaskTopicPartition;
 import 
org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor;
 import 
org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
 import org.apache.kafka.streams.processor.internals.assignment.RackUtils;
 import 
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
-import 
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
 import 
org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import 
org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
 import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskInfo;
@@ -861,8 +861,8 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
 
     private LegacyTaskAssignor createTaskAssignor(final boolean 
lagComputationSuccessful) {
         final LegacyTaskAssignor taskAssignor = 
legacyTaskAssignorSupplier.get();
-        if (taskAssignor instanceof StickyTaskAssignor) {
-            // special case: to preserve pre-existing behavior, we invoke the 
StickyTaskAssignor
+        if (taskAssignor instanceof LegacyStickyTaskAssignor) {
+            // special case: to preserve pre-existing behavior, we invoke the 
LegacyStickyTaskAssignor
             // whether or not lag computation failed.
             return taskAssignor;
         } else if (lagComputationSuccessful) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
index d7a07c6e200..c3d6b7880ed 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
@@ -32,10 +32,10 @@ import 
org.apache.kafka.streams.processor.assignment.ProcessId;
  * 2. always return true, indicating that a follow-up rebalance is needed
  */
 public class FallbackPriorTaskAssignor implements LegacyTaskAssignor {
-    private final StickyTaskAssignor delegate;
+    private final LegacyStickyTaskAssignor delegate;
 
     public FallbackPriorTaskAssignor() {
-        delegate = new StickyTaskAssignor(true);
+        delegate = new LegacyStickyTaskAssignor(true);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignor.java
similarity index 95%
rename from 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
rename to 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignor.java
index 94c5dfce184..c61759109e1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignor.java
@@ -40,9 +40,14 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
-public class StickyTaskAssignor implements LegacyTaskAssignor {
+// Note: as of 3.8, this class has been renamed from StickyTaskAssignor to 
LegacyStickyTaskAssignor,
+// and a new StickyTaskAssignor implementation was added that implements the 
new TaskAssignor interface.
+// If you were previously plugging in the old StickyTaskAssignor via the 
internal.task.assignor.class config,
+// you should migrate to the new TaskAssignor interface by removing the 
internal config and instead
+// passing in the new StickyTaskAssignor class to the new public 
task.assignor.class config
+public class LegacyStickyTaskAssignor implements LegacyTaskAssignor {
 
-    private static final Logger log = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+    private static final Logger log = 
LoggerFactory.getLogger(LegacyStickyTaskAssignor.class);
 
     // For stateful tasks, by default we want to maintain stickiness. So we 
have higher non_overlap_cost
     private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1;
@@ -59,11 +64,11 @@ public class StickyTaskAssignor implements 
LegacyTaskAssignor {
 
     private final boolean mustPreserveActiveTaskAssignment;
 
-    public StickyTaskAssignor() {
+    public LegacyStickyTaskAssignor() {
         this(false);
     }
 
-    StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) {
+    LegacyStickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) {
         this.mustPreserveActiveTaskAssignment = 
mustPreserveActiveTaskAssignment;
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 8c89132ae2f..74b4c7643fd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -1460,8 +1460,8 @@ public class StreamsConfigTest {
 
     @Test
     public void shouldReturnTaskAssignorClass() {
-        props.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, 
"StickyTaskAssignor");
-        assertEquals("StickyTaskAssignor", new 
StreamsConfig(props).getString(TASK_ASSIGNOR_CLASS_CONFIG));
+        props.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, 
"LegacyStickyTaskAssignor");
+        assertEquals("LegacyStickyTaskAssignor", new 
StreamsConfig(props).getString(TASK_ASSIGNOR_CLASS_CONFIG));
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
index c782eb3dbf6..82456fef3bd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
@@ -34,8 +34,8 @@ import org.apache.kafka.streams.StreamsConfig.InternalConfig;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import 
org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
 import 
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor;
 import 
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
-import 
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
 import 
org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockApiProcessorSupplier;
@@ -102,26 +102,26 @@ public class StreamsAssignmentScaleTest {
         completeLargeAssignment(1_000, 10, 1000, 1, 
HighAvailabilityTaskAssignor.class);
     }
 
-    /* StickyTaskAssignor tests */
+    /* LegacyStickyTaskAssignor tests */
 
     @Test(timeout = 300 * 1000)
     public void testStickyTaskAssignorLargePartitionCount() {
-        completeLargeAssignment(2_000, 2, 1, 1, StickyTaskAssignor.class);
+        completeLargeAssignment(2_000, 2, 1, 1, 
LegacyStickyTaskAssignor.class);
     }
 
     @Test(timeout = 300 * 1000)
     public void testStickyTaskAssignorLargeNumConsumers() {
-        completeLargeAssignment(1_000, 1_000, 1, 1, StickyTaskAssignor.class);
+        completeLargeAssignment(1_000, 1_000, 1, 1, 
LegacyStickyTaskAssignor.class);
     }
 
     @Test(timeout = 300 * 1000)
     public void testStickyTaskAssignorManyStandbys() {
-        completeLargeAssignment(1_000, 100, 1, 20, StickyTaskAssignor.class);
+        completeLargeAssignment(1_000, 100, 1, 20, 
LegacyStickyTaskAssignor.class);
     }
 
     @Test(timeout = 300 * 1000)
     public void testStickyTaskAssignorManyThreadsPerClient() {
-        completeLargeAssignment(1_000, 10, 1000, 1, StickyTaskAssignor.class);
+        completeLargeAssignment(1_000, 10, 1000, 1, 
LegacyStickyTaskAssignor.class);
     }
 
     /* FallbackPriorTaskAssignor tests */
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index ac5b3180081..12636c71fdb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -67,8 +67,8 @@ import 
org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
 import 
org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
 import 
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor;
 import 
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
-import 
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
 import 
org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import 
org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
 import org.apache.kafka.streams.state.HostInfo;
@@ -344,8 +344,8 @@ public class StreamsPartitionAssignorTest {
         return asList(
             new Object[]{HighAvailabilityTaskAssignor.class, true, null},
             new Object[]{HighAvailabilityTaskAssignor.class, false, null},
-            new Object[]{StickyTaskAssignor.class, true, null},
-            new Object[]{StickyTaskAssignor.class, false, null},
+            new Object[]{LegacyStickyTaskAssignor.class, true, null},
+            new Object[]{LegacyStickyTaskAssignor.class, false, null},
             new Object[]{FallbackPriorTaskAssignor.class, true, null},
             new Object[]{FallbackPriorTaskAssignor.class, false, null},
             new Object[]{null, false, 
org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor.class},
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
index 25c4d71d41a..c963b549057 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest;
+import 
org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignorTest;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetricsTest;
-import 
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -35,7 +35,7 @@ import org.junit.runners.Suite;
                         GlobalStateTaskTest.class,
                         TaskManagerTest.class,
                         TaskMetricsTest.class,
-                        StickyTaskAssignorTest.class,
+                        LegacyStickyTaskAssignorTest.class,
                         StreamsPartitionAssignorTest.class,
                         StandbyTaskCreationIntegrationTest.class,
                     })
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java
similarity index 99%
rename from 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
rename to 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java
index e9b1be48fbf..e57e91bf0a9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java
@@ -104,7 +104,7 @@ import static org.junit.Assert.assertFalse;
 import static org.mockito.Mockito.spy;
 
 @RunWith(Parameterized.class)
-public class StickyTaskAssignorTest {
+public class LegacyStickyTaskAssignorTest {
 
     private final List<Integer> expectedTopicGroupIds = asList(1, 2);
     private final Time time = new MockTime();
@@ -807,7 +807,7 @@ public class StickyTaskAssignorTest {
             time
         );
 
-        final boolean probingRebalanceNeeded = new 
StickyTaskAssignor(true).assign(
+        final boolean probingRebalanceNeeded = new 
LegacyStickyTaskAssignor(true).assign(
             clients,
             new HashSet<>(taskIds),
             new HashSet<>(taskIds),
@@ -857,7 +857,7 @@ public class StickyTaskAssignorTest {
             time
         );
 
-        final boolean probingRebalanceNeeded = new StickyTaskAssignor().assign(
+        final boolean probingRebalanceNeeded = new 
LegacyStickyTaskAssignor().assign(
             clients,
             new HashSet<>(taskIds),
             new HashSet<>(statefulTaskIds),
@@ -937,7 +937,7 @@ public class StickyTaskAssignorTest {
             tpSize, partitionSize, maxCapacity, false, statefulTasks);
 
 
-        final boolean probing = new StickyTaskAssignor().assign(
+        final boolean probing = new LegacyStickyTaskAssignor().assign(
             clientStateMap,
             taskIds,
             statefulTasks,
@@ -1005,7 +1005,7 @@ public class StickyTaskAssignorTest {
         final SortedMap<ProcessId, ClientState> clientStateMap = 
getRandomClientState(clientSize,
             tpSize, partitionSize, maxCapacity, false, statefulTasks);
 
-        new StickyTaskAssignor().assign(
+        new LegacyStickyTaskAssignor().assign(
             clientStateMap,
             taskIds,
             statefulTasks,
@@ -1047,7 +1047,7 @@ public class StickyTaskAssignorTest {
             time
         ));
 
-        new StickyTaskAssignor().assign(
+        new LegacyStickyTaskAssignor().assign(
             clientStateMapCopy,
             taskIds,
             statefulTasks,
@@ -1085,7 +1085,7 @@ public class StickyTaskAssignorTest {
     private boolean assign(final AssignmentConfigs configs, final 
RackAwareTaskAssignor rackAwareTaskAssignor, final TaskId... tasks) {
         final List<TaskId> taskIds = asList(tasks);
         Collections.shuffle(taskIds);
-        return new StickyTaskAssignor().assign(
+        return new LegacyStickyTaskAssignor().assign(
             clients,
             new HashSet<>(taskIds),
             new HashSet<>(taskIds),
diff --git 
a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py 
b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index 5057c3d2793..9eeeea3c98e 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -157,7 +157,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         # TODO KIP-441: consider rewriting the test for 
HighAvailabilityTaskAssignor
         configs = self.get_configs(
             extra_configs=",application.id=shutdown_with_broker_down" +
-                          
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor"
+                          
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
         )
 
         processor = StreamsBrokerDownResilienceService(self.test_context, 
self.kafka, configs)
@@ -236,7 +236,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         # TODO KIP-441: consider rewriting the test for 
HighAvailabilityTaskAssignor
         configs = self.get_configs(
             extra_configs=",application.id=failover_with_broker_down" +
-                          
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor"
+                          
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
         )
 
         processor = StreamsBrokerDownResilienceService(self.test_context, 
self.kafka, configs)
diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py 
b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index 78df8d3ebc3..d9804408df8 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -51,7 +51,7 @@ class StreamsStandbyTask(BaseStreamsTest):
     def test_standby_tasks_rebalance(self, metadata_quorum, 
use_new_coordinator=False):
         # TODO KIP-441: consider rewriting the test for 
HighAvailabilityTaskAssignor
         configs = self.get_configs(
-            
",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor"
 % (
+            
",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
 % (
             self.streams_source_topic,
             self.streams_sink_topic_1,
             self.streams_sink_topic_2

Reply via email to