This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4333af5c9f8 KAFKA-15045: (KIP-924 pt. 25) Rename old internal
StickyTaskAssignor to LegacyStickyTaskAssignor (#16322)
4333af5c9f8 is described below
commit 4333af5c9f89e5e8bfd67f4919fe2958484f6a22
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Jun 13 11:27:50 2024 -0700
KAFKA-15045: (KIP-924 pt. 25) Rename old internal StickyTaskAssignor to
LegacyStickyTaskAssignor (#16322)
To avoid confusion in 3.8/until we fully remove all the old task assignors
and internal config, we should rename the old internal assignor classes like
the StickyTaskAssignor so that they won't be mixed up with the new version of
the assignor (which is also named StickyTaskAssignor)
Reviewers: Bruno Cadonna <[email protected]>, Josep Prat
<[email protected]>
---
.../processor/internals/StreamsPartitionAssignor.java | 6 +++---
.../internals/assignment/FallbackPriorTaskAssignor.java | 4 ++--
...ickyTaskAssignor.java => LegacyStickyTaskAssignor.java} | 13 +++++++++----
.../java/org/apache/kafka/streams/StreamsConfigTest.java | 4 ++--
.../processor/internals/StreamsAssignmentScaleTest.java | 12 ++++++------
.../processor/internals/StreamsPartitionAssignorTest.java | 6 +++---
.../kafka/streams/processor/internals/TaskSuite.java | 4 ++--
...AssignorTest.java => LegacyStickyTaskAssignorTest.java} | 14 +++++++-------
.../tests/streams/streams_broker_down_resilience_test.py | 4 ++--
.../tests/streams/streams_standby_replica_test.py | 2 +-
10 files changed, 37 insertions(+), 32 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index e0ddd088377..4a348e194d1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -59,10 +59,10 @@ import
org.apache.kafka.streams.processor.internals.assignment.ClientState;
import
org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import
org.apache.kafka.streams.processor.internals.assignment.DefaultTaskTopicPartition;
import
org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import
org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor;
import
org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackUtils;
import
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
-import
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import
org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import
org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskInfo;
@@ -861,8 +861,8 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
private LegacyTaskAssignor createTaskAssignor(final boolean
lagComputationSuccessful) {
final LegacyTaskAssignor taskAssignor =
legacyTaskAssignorSupplier.get();
- if (taskAssignor instanceof StickyTaskAssignor) {
- // special case: to preserve pre-existing behavior, we invoke the
StickyTaskAssignor
+ if (taskAssignor instanceof LegacyStickyTaskAssignor) {
+ // special case: to preserve pre-existing behavior, we invoke the
LegacyStickyTaskAssignor
// whether or not lag computation failed.
return taskAssignor;
} else if (lagComputationSuccessful) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
index d7a07c6e200..c3d6b7880ed 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
@@ -32,10 +32,10 @@ import
org.apache.kafka.streams.processor.assignment.ProcessId;
* 2. always return true, indicating that a follow-up rebalance is needed
*/
public class FallbackPriorTaskAssignor implements LegacyTaskAssignor {
- private final StickyTaskAssignor delegate;
+ private final LegacyStickyTaskAssignor delegate;
public FallbackPriorTaskAssignor() {
- delegate = new StickyTaskAssignor(true);
+ delegate = new LegacyStickyTaskAssignor(true);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignor.java
similarity index 95%
rename from
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
rename to
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignor.java
index 94c5dfce184..c61759109e1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignor.java
@@ -40,9 +40,14 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
-public class StickyTaskAssignor implements LegacyTaskAssignor {
+// Note: as of 3.8, this class has been renamed from StickyTaskAssignor to
LegacyStickyTaskAssignor,
+// and a new StickyTaskAssignor implementation was added that implements the
new TaskAssignor interface.
+// If you were previously plugging in the old StickyTaskAssignor via the
internal.task.assignor.class config,
+// you should migrate to the new TaskAssignor interface by removing the
internal config and instead
+// passing in the new StickyTaskAssignor class to the new public
task.assignor.class config
+public class LegacyStickyTaskAssignor implements LegacyTaskAssignor {
- private static final Logger log =
LoggerFactory.getLogger(StickyTaskAssignor.class);
+ private static final Logger log =
LoggerFactory.getLogger(LegacyStickyTaskAssignor.class);
// For stateful tasks, by default we want to maintain stickiness. So we
have higher non_overlap_cost
private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1;
@@ -59,11 +64,11 @@ public class StickyTaskAssignor implements
LegacyTaskAssignor {
private final boolean mustPreserveActiveTaskAssignment;
- public StickyTaskAssignor() {
+ public LegacyStickyTaskAssignor() {
this(false);
}
- StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) {
+ LegacyStickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) {
this.mustPreserveActiveTaskAssignment =
mustPreserveActiveTaskAssignment;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 28430a27e5d..765c1fdd442 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -1460,8 +1460,8 @@ public class StreamsConfigTest {
@Test
public void shouldReturnTaskAssignorClass() {
- props.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG,
"StickyTaskAssignor");
- assertEquals("StickyTaskAssignor", new
StreamsConfig(props).getString(TASK_ASSIGNOR_CLASS_CONFIG));
+ props.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG,
"LegacyStickyTaskAssignor");
+ assertEquals("LegacyStickyTaskAssignor", new
StreamsConfig(props).getString(TASK_ASSIGNOR_CLASS_CONFIG));
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
index 4c7e2c78d01..369362f7f04 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
@@ -34,8 +34,8 @@ import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import
org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import
org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor;
import
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
-import
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import
org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockApiProcessorSupplier;
@@ -102,26 +102,26 @@ public class StreamsAssignmentScaleTest {
completeLargeAssignment(1_000, 10, 1000, 1,
HighAvailabilityTaskAssignor.class);
}
- /* StickyTaskAssignor tests */
+ /* LegacyStickyTaskAssignor tests */
@Test(timeout = 300 * 1000)
public void testStickyTaskAssignorLargePartitionCount() {
- completeLargeAssignment(2_000, 2, 1, 1, StickyTaskAssignor.class);
+ completeLargeAssignment(2_000, 2, 1, 1,
LegacyStickyTaskAssignor.class);
}
@Test(timeout = 300 * 1000)
public void testStickyTaskAssignorLargeNumConsumers() {
- completeLargeAssignment(1_000, 1_000, 1, 1, StickyTaskAssignor.class);
+ completeLargeAssignment(1_000, 1_000, 1, 1,
LegacyStickyTaskAssignor.class);
}
@Test(timeout = 300 * 1000)
public void testStickyTaskAssignorManyStandbys() {
- completeLargeAssignment(1_000, 100, 1, 20, StickyTaskAssignor.class);
+ completeLargeAssignment(1_000, 100, 1, 20,
LegacyStickyTaskAssignor.class);
}
@Test(timeout = 300 * 1000)
public void testStickyTaskAssignorManyThreadsPerClient() {
- completeLargeAssignment(1_000, 10, 1000, 1, StickyTaskAssignor.class);
+ completeLargeAssignment(1_000, 10, 1000, 1,
LegacyStickyTaskAssignor.class);
}
/* FallbackPriorTaskAssignor tests */
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index ac5b3180081..12636c71fdb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -67,8 +67,8 @@ import
org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import
org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import
org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor;
import
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
-import
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import
org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import
org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.streams.state.HostInfo;
@@ -344,8 +344,8 @@ public class StreamsPartitionAssignorTest {
return asList(
new Object[]{HighAvailabilityTaskAssignor.class, true, null},
new Object[]{HighAvailabilityTaskAssignor.class, false, null},
- new Object[]{StickyTaskAssignor.class, true, null},
- new Object[]{StickyTaskAssignor.class, false, null},
+ new Object[]{LegacyStickyTaskAssignor.class, true, null},
+ new Object[]{LegacyStickyTaskAssignor.class, false, null},
new Object[]{FallbackPriorTaskAssignor.class, true, null},
new Object[]{FallbackPriorTaskAssignor.class, false, null},
new Object[]{null, false,
org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor.class},
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
index 25c4d71d41a..c963b549057 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
@@ -17,8 +17,8 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest;
+import
org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignorTest;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetricsTest;
-import
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -35,7 +35,7 @@ import org.junit.runners.Suite;
GlobalStateTaskTest.class,
TaskManagerTest.class,
TaskMetricsTest.class,
- StickyTaskAssignorTest.class,
+ LegacyStickyTaskAssignorTest.class,
StreamsPartitionAssignorTest.class,
StandbyTaskCreationIntegrationTest.class,
})
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java
similarity index 99%
rename from
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
rename to
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java
index e9b1be48fbf..e57e91bf0a9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java
@@ -104,7 +104,7 @@ import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.spy;
@RunWith(Parameterized.class)
-public class StickyTaskAssignorTest {
+public class LegacyStickyTaskAssignorTest {
private final List<Integer> expectedTopicGroupIds = asList(1, 2);
private final Time time = new MockTime();
@@ -807,7 +807,7 @@ public class StickyTaskAssignorTest {
time
);
- final boolean probingRebalanceNeeded = new
StickyTaskAssignor(true).assign(
+ final boolean probingRebalanceNeeded = new
LegacyStickyTaskAssignor(true).assign(
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
@@ -857,7 +857,7 @@ public class StickyTaskAssignorTest {
time
);
- final boolean probingRebalanceNeeded = new StickyTaskAssignor().assign(
+ final boolean probingRebalanceNeeded = new
LegacyStickyTaskAssignor().assign(
clients,
new HashSet<>(taskIds),
new HashSet<>(statefulTaskIds),
@@ -937,7 +937,7 @@ public class StickyTaskAssignorTest {
tpSize, partitionSize, maxCapacity, false, statefulTasks);
- final boolean probing = new StickyTaskAssignor().assign(
+ final boolean probing = new LegacyStickyTaskAssignor().assign(
clientStateMap,
taskIds,
statefulTasks,
@@ -1005,7 +1005,7 @@ public class StickyTaskAssignorTest {
final SortedMap<ProcessId, ClientState> clientStateMap =
getRandomClientState(clientSize,
tpSize, partitionSize, maxCapacity, false, statefulTasks);
- new StickyTaskAssignor().assign(
+ new LegacyStickyTaskAssignor().assign(
clientStateMap,
taskIds,
statefulTasks,
@@ -1047,7 +1047,7 @@ public class StickyTaskAssignorTest {
time
));
- new StickyTaskAssignor().assign(
+ new LegacyStickyTaskAssignor().assign(
clientStateMapCopy,
taskIds,
statefulTasks,
@@ -1085,7 +1085,7 @@ public class StickyTaskAssignorTest {
private boolean assign(final AssignmentConfigs configs, final
RackAwareTaskAssignor rackAwareTaskAssignor, final TaskId... tasks) {
final List<TaskId> taskIds = asList(tasks);
Collections.shuffle(taskIds);
- return new StickyTaskAssignor().assign(
+ return new LegacyStickyTaskAssignor().assign(
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
diff --git
a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index 5057c3d2793..9eeeea3c98e 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -157,7 +157,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
# TODO KIP-441: consider rewriting the test for
HighAvailabilityTaskAssignor
configs = self.get_configs(
extra_configs=",application.id=shutdown_with_broker_down" +
-
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor"
+
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
)
processor = StreamsBrokerDownResilienceService(self.test_context,
self.kafka, configs)
@@ -236,7 +236,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
# TODO KIP-441: consider rewriting the test for
HighAvailabilityTaskAssignor
configs = self.get_configs(
extra_configs=",application.id=failover_with_broker_down" +
-
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor"
+
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
)
processor = StreamsBrokerDownResilienceService(self.test_context,
self.kafka, configs)
diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index 78df8d3ebc3..d9804408df8 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -51,7 +51,7 @@ class StreamsStandbyTask(BaseStreamsTest):
def test_standby_tasks_rebalance(self, metadata_quorum,
use_new_coordinator=False):
# TODO KIP-441: consider rewriting the test for
HighAvailabilityTaskAssignor
configs = self.get_configs(
-
",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor"
% (
+
",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
% (
self.streams_source_topic,
self.streams_sink_topic_1,
self.streams_sink_topic_2