This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 103ff5c0f06 KAFKA-15045: (KIP-924 pt. 24) internal TaskAssignor rename
to LegacyTaskAssignor (#16318)
103ff5c0f06 is described below
commit 103ff5c0f06a35c725c87d534ad591e9e95534ff
Author: Antoine Pourchet <[email protected]>
AuthorDate: Thu Jun 13 01:32:39 2024 -0600
KAFKA-15045: (KIP-924 pt. 24) internal TaskAssignor rename to
LegacyTaskAssignor (#16318)
Since the new public API for TaskAssignor shared a name, this rename will
prevent users from confusing the internal definition with the public one.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
.../processor/internals/StreamsPartitionAssignor.java | 12 ++++++------
.../internals/assignment/AssignorConfiguration.java | 4 ++--
.../internals/assignment/FallbackPriorTaskAssignor.java | 2 +-
.../assignment/HighAvailabilityTaskAssignor.java | 2 +-
.../{TaskAssignor.java => LegacyTaskAssignor.java} | 2 +-
.../internals/assignment/StandbyTaskAssignor.java | 2 +-
.../internals/assignment/StickyTaskAssignor.java | 2 +-
.../integration/TaskAssignorIntegrationTest.java | 17 +++++++++--------
.../processor/internals/StreamsAssignmentScaleTest.java | 4 ++--
.../internals/StreamsPartitionAssignorTest.java | 6 +++---
10 files changed, 27 insertions(+), 26 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 887ef86faf5..e0ddd088377 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -64,7 +64,7 @@ import
org.apache.kafka.streams.processor.internals.assignment.RackUtils;
import
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import
org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import
org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
@@ -224,7 +224,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
private
Supplier<Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor>>
customTaskAssignorSupplier;
- private Supplier<TaskAssignor> internalTaskAssignorSupplier;
+ private Supplier<LegacyTaskAssignor> legacyTaskAssignorSupplier;
private byte uniqueField;
private Map<String, String> clientTags;
@@ -259,7 +259,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
copartitionedTopicsEnforcer =
assignorConfiguration.copartitionedTopicsEnforcer();
rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
customTaskAssignorSupplier = assignorConfiguration::customTaskAssignor;
- internalTaskAssignorSupplier = assignorConfiguration::taskAssignor;
+ legacyTaskAssignorSupplier = assignorConfiguration::taskAssignor;
assignmentListener = assignorConfiguration.assignmentListener();
uniqueField = 0;
clientTags = referenceContainer.clientTags;
@@ -817,7 +817,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
};
} else {
customTaskAssignmentListener = (assignment, subscription) -> { };
- final TaskAssignor taskAssignor =
createTaskAssignor(lagComputationSuccessful);
+ final LegacyTaskAssignor taskAssignor =
createTaskAssignor(lagComputationSuccessful);
final RackAwareTaskAssignor rackAwareTaskAssignor = new
RackAwareTaskAssignor(
fullMetadata,
partitionsForTask,
@@ -859,8 +859,8 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
return customTaskAssignmentListener;
}
- private TaskAssignor createTaskAssignor(final boolean
lagComputationSuccessful) {
- final TaskAssignor taskAssignor = internalTaskAssignorSupplier.get();
+ private LegacyTaskAssignor createTaskAssignor(final boolean
lagComputationSuccessful) {
+ final LegacyTaskAssignor taskAssignor =
legacyTaskAssignorSupplier.get();
if (taskAssignor instanceof StickyTaskAssignor) {
// special case: to preserve pre-existing behavior, we invoke the
StickyTaskAssignor
// whether or not lag computation failed.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index 1971642942a..311863630af 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -242,9 +242,9 @@ public final class AssignorConfiguration {
return AssignmentConfigs.of(streamsConfig);
}
- public TaskAssignor taskAssignor() {
+ public LegacyTaskAssignor taskAssignor() {
try {
- return Utils.newInstance(internalTaskAssignorClass,
TaskAssignor.class);
+ return Utils.newInstance(internalTaskAssignorClass,
LegacyTaskAssignor.class);
} catch (final ClassNotFoundException e) {
throw new IllegalArgumentException(
"Expected an instantiable class name for " +
INTERNAL_TASK_ASSIGNOR_CLASS,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
index 93dcd6192b6..d7a07c6e200 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
@@ -31,7 +31,7 @@ import
org.apache.kafka.streams.processor.assignment.ProcessId;
* 1. ignore the task lags in the ClientState map
* 2. always return true, indicating that a follow-up rebalance is needed
*/
-public class FallbackPriorTaskAssignor implements TaskAssignor {
+public class FallbackPriorTaskAssignor implements LegacyTaskAssignor {
private final StickyTaskAssignor delegate;
public FallbackPriorTaskAssignor() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
index bda199ea335..272cd824b43 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
@@ -43,7 +43,7 @@ import static
org.apache.kafka.streams.processor.internals.assignment.RackAwareT
import static
org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignActiveTaskMovements;
import static
org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignStandbyTaskMovements;
-public class HighAvailabilityTaskAssignor implements TaskAssignor {
+public class HighAvailabilityTaskAssignor implements LegacyTaskAssignor {
private static final Logger log =
LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class);
public static final int DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST = 10;
public static final int DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST = 1;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/LegacyTaskAssignor.java
similarity index 97%
rename from
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
rename to
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/LegacyTaskAssignor.java
index e01c13b59da..9a7485b64aa 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/LegacyTaskAssignor.java
@@ -23,7 +23,7 @@ import java.util.Set;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.ProcessId;
-public interface TaskAssignor {
+public interface LegacyTaskAssignor {
/**
* @return whether the generated assignment requires a followup probing
rebalance to satisfy all conditions
*/
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java
index 022eaae078c..4787ac65523 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.ProcessId;
-interface StandbyTaskAssignor extends TaskAssignor {
+interface StandbyTaskAssignor extends LegacyTaskAssignor {
default boolean isAllowedTaskMovement(final ClientState source, final
ClientState destination) {
return true;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index b6953613d89..94c5dfce184 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -40,7 +40,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
-public class StickyTaskAssignor implements TaskAssignor {
+public class StickyTaskAssignor implements LegacyTaskAssignor {
private static final Logger log =
LoggerFactory.getLogger(StickyTaskAssignor.class);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
index 9d7d298f831..3e29773a151 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
@@ -30,7 +30,7 @@ import
org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
import
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import
org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
@@ -78,7 +78,8 @@ public class TaskAssignorIntegrationTest {
public TestName testName = new TestName();
// Just a dummy implementation so we can check the config
- public static final class MyTaskAssignor extends
HighAvailabilityTaskAssignor implements TaskAssignor { }
+ public static final class MyLegacyTaskAssignor extends
HighAvailabilityTaskAssignor implements
+ LegacyTaskAssignor { }
@SuppressWarnings("unchecked")
@Test
@@ -116,7 +117,7 @@ public class TaskAssignorIntegrationTest {
mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG,
"480000"),
mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER,
configuredAssignmentListener),
-
mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS,
MyTaskAssignor.class.getName())
+
mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS,
MyLegacyTaskAssignor.class.getName())
)
);
@@ -153,18 +154,18 @@ public class TaskAssignorIntegrationTest {
assignmentListenerField.setAccessible(true);
final AssignmentListener actualAssignmentListener =
(AssignmentListener) assignmentListenerField.get(streamsPartitionAssignor);
- final Field taskAssignorSupplierField =
StreamsPartitionAssignor.class.getDeclaredField("internalTaskAssignorSupplier");
+ final Field taskAssignorSupplierField =
StreamsPartitionAssignor.class.getDeclaredField("legacyTaskAssignorSupplier");
taskAssignorSupplierField.setAccessible(true);
- final Supplier<TaskAssignor> taskAssignorSupplier =
- (Supplier<TaskAssignor>)
taskAssignorSupplierField.get(streamsPartitionAssignor);
- final TaskAssignor taskAssignor = taskAssignorSupplier.get();
+ final Supplier<LegacyTaskAssignor> taskAssignorSupplier =
+ (Supplier<LegacyTaskAssignor>)
taskAssignorSupplierField.get(streamsPartitionAssignor);
+ final LegacyTaskAssignor taskAssignor = taskAssignorSupplier.get();
assertThat(configs.numStandbyReplicas(), is(5));
assertThat(configs.acceptableRecoveryLag(), is(6L));
assertThat(configs.maxWarmupReplicas(), is(7));
assertThat(configs.probingRebalanceIntervalMs(), is(480000L));
assertThat(actualAssignmentListener,
sameInstance(configuredAssignmentListener));
- assertThat(taskAssignor, instanceOf(MyTaskAssignor.class));
+ assertThat(taskAssignor, instanceOf(MyLegacyTaskAssignor.class));
}
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
index b3c7131341b..c782eb3dbf6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
@@ -36,7 +36,7 @@ import
org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTask
import
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import
org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockClientSupplier;
@@ -150,7 +150,7 @@ public class StreamsAssignmentScaleTest {
final int numClients,
final int numThreadsPerClient,
final int numStandbys,
- final Class<? extends TaskAssignor>
taskAssignor) {
+ final Class<? extends
LegacyTaskAssignor> taskAssignor) {
final List<String> topic = singletonList("topic");
final Map<TopicPartition, Long> changelogEndOffsets = new HashMap<>();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 43c07ee3c28..ac5b3180081 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -70,7 +70,7 @@ import
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityT
import
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import
org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import
org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockClientSupplier;
@@ -237,7 +237,7 @@ public class StreamsPartitionAssignorTest {
@Captor
private ArgumentCaptor<Map<TopicPartition, PartitionInfo>>
topicPartitionInfoCaptor;
private final Map<String, Subscription> subscriptions = new HashMap<>();
- private final Class<? extends TaskAssignor> internalTaskAssignor;
+ private final Class<? extends LegacyTaskAssignor> internalTaskAssignor;
private final Class<? extends
org.apache.kafka.streams.processor.assignment.TaskAssignor> customTaskAssignor;
private final String rackAwareAssignorStrategy;
private Map<String, String> clientTags;
@@ -354,7 +354,7 @@ public class StreamsPartitionAssignorTest {
);
}
- public StreamsPartitionAssignorTest(final Class<? extends TaskAssignor>
internalTaskAssignor,
+ public StreamsPartitionAssignorTest(final Class<? extends
LegacyTaskAssignor> internalTaskAssignor,
final boolean enableRackAwareAssignor,
final Class<? extends
org.apache.kafka.streams.processor.assignment.TaskAssignor> customTaskAssignor)
{
this.internalTaskAssignor = internalTaskAssignor;