mjsax commented on code in PR #14139:
URL: https://github.com/apache/kafka/pull/14139#discussion_r1286401088
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##########
@@ -145,6 +163,14 @@ private void assignStandbyReplicaTasks(final TreeMap<UUID,
ClientState> clientSt
ClientState::assignStandby,
standbyTaskAssignor::isAllowedTaskMovement
);
+
+ if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent()
&& rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {
Review Comment:
As above.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##########
@@ -208,19 +234,27 @@ private static boolean shouldMoveATask(final ClientState
sourceClientState,
}
private static void assignStatelessActiveTasks(final TreeMap<UUID,
ClientState> clientStates,
- final Iterable<TaskId>
statelessTasks) {
+ final Iterable<TaskId>
statelessTasks,
+ final
Optional<RackAwareTaskAssignor> rackAwareTaskAssignor) {
final ConstrainedPrioritySet statelessActiveTaskClientsByTaskLoad =
new ConstrainedPrioritySet(
(client, task) -> true,
client -> clientStates.get(client).activeTaskLoad()
);
statelessActiveTaskClientsByTaskLoad.offerAll(clientStates.keySet());
+ final SortedSet<TaskId> sortedTasks = new TreeSet<>();
for (final TaskId task : statelessTasks) {
+ sortedTasks.add(task);
final UUID client =
statelessActiveTaskClientsByTaskLoad.poll(task);
final ClientState state = clientStates.get(client);
state.assignActive(task);
statelessActiveTaskClientsByTaskLoad.offer(client);
}
+
+ if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent()
&& rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {
Review Comment:
As above.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java:
##########
@@ -56,52 +67,118 @@
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClusterForAllTopics;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getProcessRacksForAllProcess;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomClientState;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomCluster;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomProcessRacks;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskChangelogMapForAllTasks;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMap;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTopologyGroupTaskMap;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasActiveTasks;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasAssignedTasks;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasStandbyTasks;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.mockInternalTopicManagerForChangelog;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.mockInternalTopicManagerForRandomChangelog;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.verifyStandbySatisfyRackReplica;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.fail;
-
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(Parameterized.class)
public class HighAvailabilityTaskAssignorTest {
- private final AssignmentConfigs configWithoutStandbys = new
AssignmentConfigs(
- /*acceptableRecoveryLag*/ 100L,
- /*maxWarmupReplicas*/ 2,
- /*numStandbyReplicas*/ 0,
- /*probingRebalanceIntervalMs*/ 60 * 1000L,
- /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
- );
-
- private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
- /*acceptableRecoveryLag*/ 100L,
- /*maxWarmupReplicas*/ 2,
- /*numStandbyReplicas*/ 1,
- /*probingRebalanceIntervalMs*/ 60 * 1000L,
- /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
- );
+ private AssignmentConfigs getConfigWithoutStandbys() {
+ return new AssignmentConfigs(
+ /*acceptableRecoveryLag*/ 100L,
+ /*maxWarmupReplicas*/ 2,
+ /*numStandbyReplicas*/ 0,
+ /*probingRebalanceIntervalMs*/ 60 * 1000L,
+ /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS,
+ null,
+ null,
+ rackAwareStrategy
+ );
+ }
+
+ private AssignmentConfigs getConfigWithStandbys() {
+ return getConfigWithStandbys(1);
+ }
+
+ private AssignmentConfigs getConfigWithStandbys(final int replicaNum) {
+ return new AssignmentConfigs(
+ /*acceptableRecoveryLag*/ 100L,
+ /*maxWarmupReplicas*/ 2,
+ /*numStandbyReplicas*/ replicaNum,
+ /*probingRebalanceIntervalMs*/ 60 * 1000L,
+ /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS,
+ null,
+ null,
+ rackAwareStrategy
+ );
+ }
+
+ @Parameter
+ public boolean enableRackAwareTaskAssignor;
+
+ private String rackAwareStrategy =
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE;
+
+ @Before
+ public void setUp() {
+ if (enableRackAwareTaskAssignor) {
+ rackAwareStrategy =
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC;
+ }
+ }
+
+ @Parameterized.Parameters(name = "enableRackAwareTaskAssignor={0}")
+ public static Collection<Object[]> getParamStoreType() {
+ return asList(new Object[][] {
+ {true},
+ {false}
+ });
+ }
@Test
public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {
final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2,
TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
- final ClientState clientState1 = new ClientState(allTaskIds,
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)),
EMPTY_CLIENT_TAGS, 1);
- final ClientState clientState2 = new ClientState(emptySet(),
allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)),
EMPTY_CLIENT_TAGS, 1);
- final ClientState clientState3 = new ClientState(emptySet(),
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k ->
Long.MAX_VALUE)), EMPTY_CLIENT_TAGS, 1);
+ final ClientState clientState1 = new ClientState(allTaskIds,
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)),
EMPTY_CLIENT_TAGS, 1, UUID_1);
+ final ClientState clientState2 = new ClientState(emptySet(),
allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)),
EMPTY_CLIENT_TAGS, 1, UUID_2);
+ final ClientState clientState3 = new ClientState(emptySet(),
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k ->
Long.MAX_VALUE)), EMPTY_CLIENT_TAGS, 1, UUID_3);
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, clientState1),
mkEntry(UUID_2, clientState2),
mkEntry(UUID_3, clientState3)
);
+ final AssignmentConfigs configs = new AssignmentConfigs(
+ 11L,
+ 2,
+ 1,
+ 60_000L,
+ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS,
+ null,
+ null,
+ rackAwareStrategy
+ );
+ final RackAwareTaskAssignor rackAwareTaskAssignor =
getRackAwareTaskAssignor(configs);
+
final boolean unstable = new HighAvailabilityTaskAssignor().assign(
clientStates,
allTaskIds,
allTaskIds,
- new AssignmentConfigs(11L, 2, 1, 60_000L,
EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ Optional.of(rackAwareTaskAssignor),
Review Comment:
Aligns (and triggered) my comments above -- if we can always pass a rack
aware assignor as it's enabled/disabled by the `config` anyway, we should not
make it optional and require that it's not-null (and otherwise crash as it
would indicate a bug).
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##########
@@ -124,11 +133,20 @@ private static void assignActiveStatefulTasks(final
SortedMap<UUID, ClientState>
ClientState::assignActive,
(source, destination) -> true
);
+
+ if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent()
&& rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {
Review Comment:
Just wondering why we would ever pass in `null`? The rack aware assignor is
an internal class, so we can always construct it, and in the end we make a
decision about using it or not via `canEnableRackAwareAssignor()` that does the
the corresponding config anyway? -- I seems simpler to alway pass an instance
of the rack aware assignor and simplify the code here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]