lihaosky commented on code in PR #14139:
URL: https://github.com/apache/kafka/pull/14139#discussion_r1286439107
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java:
##########
@@ -56,52 +67,118 @@
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClusterForAllTopics;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getProcessRacksForAllProcess;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomClientState;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomCluster;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomProcessRacks;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskChangelogMapForAllTasks;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMap;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTopologyGroupTaskMap;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasActiveTasks;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasAssignedTasks;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasStandbyTasks;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.mockInternalTopicManagerForChangelog;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.mockInternalTopicManagerForRandomChangelog;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.verifyStandbySatisfyRackReplica;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.fail;
-
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(Parameterized.class)
public class HighAvailabilityTaskAssignorTest {
- private final AssignmentConfigs configWithoutStandbys = new
AssignmentConfigs(
- /*acceptableRecoveryLag*/ 100L,
- /*maxWarmupReplicas*/ 2,
- /*numStandbyReplicas*/ 0,
- /*probingRebalanceIntervalMs*/ 60 * 1000L,
- /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
- );
-
- private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
- /*acceptableRecoveryLag*/ 100L,
- /*maxWarmupReplicas*/ 2,
- /*numStandbyReplicas*/ 1,
- /*probingRebalanceIntervalMs*/ 60 * 1000L,
- /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
- );
+ private AssignmentConfigs getConfigWithoutStandbys() {
+ return new AssignmentConfigs(
+ /*acceptableRecoveryLag*/ 100L,
+ /*maxWarmupReplicas*/ 2,
+ /*numStandbyReplicas*/ 0,
+ /*probingRebalanceIntervalMs*/ 60 * 1000L,
+ /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS,
+ null,
+ null,
+ rackAwareStrategy
+ );
+ }
+
+ private AssignmentConfigs getConfigWithStandbys() {
+ return getConfigWithStandbys(1);
+ }
+
+ private AssignmentConfigs getConfigWithStandbys(final int replicaNum) {
+ return new AssignmentConfigs(
+ /*acceptableRecoveryLag*/ 100L,
+ /*maxWarmupReplicas*/ 2,
+ /*numStandbyReplicas*/ replicaNum,
+ /*probingRebalanceIntervalMs*/ 60 * 1000L,
+ /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS,
+ null,
+ null,
+ rackAwareStrategy
+ );
+ }
+
+ @Parameter
+ public boolean enableRackAwareTaskAssignor;
+
+ private String rackAwareStrategy =
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE;
+
+ @Before
+ public void setUp() {
+ if (enableRackAwareTaskAssignor) {
+ rackAwareStrategy =
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC;
+ }
+ }
+
+ @Parameterized.Parameters(name = "enableRackAwareTaskAssignor={0}")
+ public static Collection<Object[]> getParamStoreType() {
+ return asList(new Object[][] {
+ {true},
+ {false}
+ });
+ }
@Test
public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {
final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2,
TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
- final ClientState clientState1 = new ClientState(allTaskIds,
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)),
EMPTY_CLIENT_TAGS, 1);
- final ClientState clientState2 = new ClientState(emptySet(),
allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)),
EMPTY_CLIENT_TAGS, 1);
- final ClientState clientState3 = new ClientState(emptySet(),
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k ->
Long.MAX_VALUE)), EMPTY_CLIENT_TAGS, 1);
+ final ClientState clientState1 = new ClientState(allTaskIds,
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)),
EMPTY_CLIENT_TAGS, 1, UUID_1);
+ final ClientState clientState2 = new ClientState(emptySet(),
allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)),
EMPTY_CLIENT_TAGS, 1, UUID_2);
+ final ClientState clientState3 = new ClientState(emptySet(),
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k ->
Long.MAX_VALUE)), EMPTY_CLIENT_TAGS, 1, UUID_3);
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, clientState1),
mkEntry(UUID_2, clientState2),
mkEntry(UUID_3, clientState3)
);
+ final AssignmentConfigs configs = new AssignmentConfigs(
+ 11L,
+ 2,
+ 1,
+ 60_000L,
+ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS,
+ null,
+ null,
+ rackAwareStrategy
+ );
+ final RackAwareTaskAssignor rackAwareTaskAssignor =
getRackAwareTaskAssignor(configs);
+
final boolean unstable = new HighAvailabilityTaskAssignor().assign(
clientStates,
allTaskIds,
allTaskIds,
- new AssignmentConfigs(11L, 2, 1, 60_000L,
EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ Optional.of(rackAwareTaskAssignor),
Review Comment:
Yeah. I can remove the null check in my next pr which modifies all tests for
HAAssignor
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]