This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 87264e67143 KAFKA-15045: (KIP-924 pt. 23) More TaskAssignmentUtils 
tests (#16292)
87264e67143 is described below

commit 87264e671433c7632c1cc6872289ad77a740d9f1
Author: Antoine Pourchet <[email protected]>
AuthorDate: Wed Jun 12 15:25:47 2024 -0600

    KAFKA-15045: (KIP-924 pt. 23) More TaskAssignmentUtils tests (#16292)
    
    Also moved the assignment validation test from StreamsPartitionAssignorTest 
to TaskAssignmentUtilsTest.
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../processor/assignment/AssignmentConfigs.java    |  24 +--
 .../assignment/KafkaStreamsAssignment.java         |  16 ++
 .../streams/processor/assignment/ProcessId.java    |   8 +
 .../processor/assignment/TaskAssignmentUtils.java  |   8 +-
 .../assignment/assignors/StickyTaskAssignor.java   |  23 +-
 .../internals/StreamsPartitionAssignor.java        |   3 +-
 .../internals/StreamsPartitionAssignorTest.java    | 126 -----------
 .../processor/internals/TaskManagerTest.java       |   3 +-
 .../assignment/KafkaStreamsAssignmentTest.java     |  49 +++++
 .../assignment/KafkaStreamsStateTest.java          |   5 +-
 .../assignment/TaskAssignmentUtilsTest.java        | 233 ++++++++++++++++++++-
 11 files changed, 333 insertions(+), 165 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java
index 6a7ca68a50f..abd9d50b8f4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java
@@ -20,8 +20,6 @@ import java.util.List;
 import java.util.OptionalInt;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.streams.StreamsConfig;
-import 
org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor;
-import 
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
 
 /**
  * Assignment related configs for the Kafka Streams {@link TaskAssignor}.
@@ -43,26 +41,8 @@ public class AssignmentConfigs {
         final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
         final List<String> rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
         final String rackAwareAssignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
-        Integer rackAwareTrafficCost = 
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
-        Integer rackAwareNonOverlapCost = 
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);
-
-        final String assignorClassName = 
configs.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
-        if (StickyTaskAssignor.class.getName().equals(assignorClassName)) {
-            if (rackAwareTrafficCost == null) {
-                rackAwareTrafficCost = 
StickyTaskAssignor.DEFAULT_STICKY_TRAFFIC_COST;
-            }
-            if (rackAwareNonOverlapCost == null) {
-                rackAwareNonOverlapCost = 
StickyTaskAssignor.DEFAULT_STICKY_NON_OVERLAP_COST;
-            }
-        } else if 
(HighAvailabilityTaskAssignor.class.getName().equals(assignorClassName)) {
-            // TODO KAFKA-16869: replace with the HighAvailabilityTaskAssignor 
class once it implements the new TaskAssignor interface
-            if (rackAwareTrafficCost == null) {
-                rackAwareTrafficCost = 
HighAvailabilityTaskAssignor.DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST;
-            }
-            if (rackAwareNonOverlapCost == null) {
-                rackAwareNonOverlapCost = 
HighAvailabilityTaskAssignor.DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST;
-            }
-        }
+        final Integer rackAwareTrafficCost = 
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
+        final Integer rackAwareNonOverlapCost = 
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);
 
         return new AssignmentConfigs(
             acceptableRecoveryLag,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java
index f5205c8422b..848219d8c72 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.assignment;
 import static java.util.Collections.unmodifiableMap;
 
 import java.time.Instant;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -108,6 +109,16 @@ public class KafkaStreamsAssignment {
         return followupRebalanceDeadline;
     }
 
+    @Override
+    public String toString() {
+        return String.format(
+            "KafkaStreamsAssignment{%s, %s, %s}",
+            processId,
+            Arrays.toString(tasks.values().toArray(new AssignedTask[0])),
+            followupRebalanceDeadline
+        );
+    }
+
     public static class AssignedTask {
         private final TaskId id;
         private final Type taskType;
@@ -157,5 +168,10 @@ public class KafkaStreamsAssignment {
             final AssignedTask other = (AssignedTask) obj;
             return this.id.equals(other.id()) && this.taskType == 
other.taskType;
         }
+
+        @Override
+        public String toString() {
+            return String.format("AssignedTask{%s, %s}", taskType, id);
+        }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java
index 9dd4025112a..0a3c2c2bfb4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java
@@ -35,6 +35,14 @@ public class ProcessId implements Comparable<ProcessId> {
         return id;
     }
 
+    /**
+     *
+     * @return a randomly generated process id
+     */
+    public static ProcessId randomProcessId() {
+        return new ProcessId(UUID.randomUUID());
+    }
+
     @Override
     public String toString() {
         return id.toString();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
index 39a698adfa5..9af8a10cbd6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
@@ -59,16 +59,16 @@ public final class TaskAssignmentUtils {
      * A simple config container for necessary parameters and optional 
overrides to apply when
      * running the active or standby task rack-aware optimizations.
      */
-    public static class RackAwareOptimizationParams {
+    public static final class RackAwareOptimizationParams {
         private final ApplicationState applicationState;
         private final Optional<Integer> trafficCostOverride;
         private final Optional<Integer> nonOverlapCostOverride;
         private final Optional<SortedSet<TaskId>> tasksToOptimize;
 
         private RackAwareOptimizationParams(final ApplicationState 
applicationState,
-                                           final Optional<Integer> 
trafficCostOverride,
-                                           final Optional<Integer> 
nonOverlapCostOverride,
-                                           final Optional<SortedSet<TaskId>> 
tasksToOptimize) {
+                                            final Optional<Integer> 
trafficCostOverride,
+                                            final Optional<Integer> 
nonOverlapCostOverride,
+                                            final Optional<SortedSet<TaskId>> 
tasksToOptimize) {
             this.applicationState = applicationState;
             this.trafficCostOverride = trafficCostOverride;
             this.nonOverlapCostOverride = nonOverlapCostOverride;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java
index 3d5e5b247f7..fe01b502a22 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java
@@ -95,10 +95,15 @@ public class StickyTaskAssignor implements TaskAssignor {
 
         final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = 
assignmentState.newAssignments;
 
-        TaskAssignmentUtils.optimizeRackAwareActiveTasks(
-            
RackAwareOptimizationParams.of(applicationState).forStatefulTasks(),
-            currentAssignments
-        );
+        final RackAwareOptimizationParams statefulTaskParams = 
RackAwareOptimizationParams.of(applicationState)
+            .withTrafficCostOverride(
+                
applicationState.assignmentConfigs().rackAwareTrafficCost().orElse(DEFAULT_STICKY_TRAFFIC_COST)
+            )
+            .withNonOverlapCostOverride(
+                
applicationState.assignmentConfigs().rackAwareNonOverlapCost().orElse(DEFAULT_STICKY_NON_OVERLAP_COST)
+            )
+            .forStatefulTasks();
+        TaskAssignmentUtils.optimizeRackAwareActiveTasks(statefulTaskParams, 
currentAssignments);
 
         TaskAssignmentUtils.optimizeRackAwareActiveTasks(
             RackAwareOptimizationParams.of(applicationState)
@@ -120,7 +125,15 @@ public class StickyTaskAssignor implements TaskAssignor {
         }
 
         final Map<ProcessId, KafkaStreamsAssignment> assignments = 
assignmentState.newAssignments;
-        
TaskAssignmentUtils.optimizeRackAwareStandbyTasks(RackAwareOptimizationParams.of(applicationState),
 assignments);
+
+        final RackAwareOptimizationParams optimizationParams = 
RackAwareOptimizationParams.of(applicationState)
+            .withTrafficCostOverride(
+                
applicationState.assignmentConfigs().rackAwareTrafficCost().orElse(DEFAULT_STICKY_TRAFFIC_COST)
+            )
+            .withNonOverlapCostOverride(
+                
applicationState.assignmentConfigs().rackAwareNonOverlapCost().orElse(DEFAULT_STICKY_NON_OVERLAP_COST)
+            );
+        TaskAssignmentUtils.optimizeRackAwareStandbyTasks(optimizationParams, 
assignments);
         assignmentState.processOptimizedAssignments(assignments);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 38b164f1969..887ef86faf5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -93,7 +93,6 @@ import java.util.stream.Collectors;
 
 import static java.util.Collections.unmodifiableSet;
 import static java.util.Map.Entry.comparingByKey;
-import static java.util.UUID.randomUUID;
 import static org.apache.kafka.common.utils.Utils.filterMap;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsResult;
@@ -197,7 +196,7 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
     }
 
     // keep track of any future consumers in a "dummy" Client since we can't 
decipher their subscription
-    private static final ProcessId FUTURE_ID = new ProcessId(randomUUID());
+    private static final ProcessId FUTURE_ID = ProcessId.randomProcessId();
 
     protected static final Comparator<TopicPartition> PARTITION_COMPARATOR =
         
Comparator.comparing(TopicPartition::topic).thenComparingInt(TopicPartition::partition);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 9c7338f748d..43c07ee3c28 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals;
 
 import java.util.Arrays;
 import java.util.Optional;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClient;
@@ -60,20 +59,12 @@ import 
org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.assignment.ApplicationState;
-import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
-import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
 import org.apache.kafka.streams.processor.assignment.ProcessId;
-import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
-import org.apache.kafka.streams.processor.assignment.TaskInfo;
 import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
-import 
org.apache.kafka.streams.processor.internals.assignment.DefaultApplicationState;
-import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskInfo;
-import 
org.apache.kafka.streams.processor.internals.assignment.DefaultTaskTopicPartition;
 import 
org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
 import 
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
 import 
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
@@ -2616,123 +2607,6 @@ public class StreamsPartitionAssignorTest {
         assertEquals(clientTags, partitionAssignor.clientTags());
     }
 
-    @Test
-    public void testValidateTaskAssignment() {
-        createDefaultMockTaskManager();
-        configureDefaultPartitionAssignor();
-
-        final StreamsConfig streamsConfig = new StreamsConfig(configProps());
-        final AssignmentConfigs assignmentConfigs = 
AssignmentConfigs.of(streamsConfig);
-        final Set<TaskInfo> tasks = mkSet(
-            new DefaultTaskInfo(
-                new TaskId(1, 1),
-                false,
-                mkSet(),
-                mkSet(
-                    new DefaultTaskTopicPartition(
-                        new TopicPartition("t1", 1),
-                        true,
-                        false,
-                        () -> { }
-                    )
-                )
-            )
-        );
-
-        final ProcessId clientUuid1 = new ProcessId(UUID.randomUUID());
-        final ProcessId clientUuid2 = new ProcessId(UUID.randomUUID());
-        final Map<ProcessId, StreamsPartitionAssignor.ClientMetadata> clients 
= mkMap(
-            mkEntry(clientUuid1, new 
StreamsPartitionAssignor.ClientMetadata(clientUuid1, "endpoint1:80", mkMap(), 
Optional.empty())),
-            mkEntry(clientUuid2, new 
StreamsPartitionAssignor.ClientMetadata(clientUuid1, "endpoint2:80", mkMap(), 
Optional.empty()))
-        );
-        final ApplicationState applicationState = new DefaultApplicationState(
-            assignmentConfigs,
-            tasks.stream().collect(Collectors.toMap(
-                TaskInfo::id,
-                t -> t
-            )),
-            clients
-        );
-
-        // ****
-        final 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment 
noError = new 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
-            mkSet(
-                KafkaStreamsAssignment.of(clientUuid1, mkSet(
-                    new KafkaStreamsAssignment.AssignedTask(
-                        new TaskId(1, 1), 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
-                    )
-                )),
-                KafkaStreamsAssignment.of(clientUuid2, mkSet())
-            )
-        );
-        
org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError 
error = TaskAssignmentUtils.validateTaskAssignment(applicationState, noError);
-        
assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.NONE,
 error);
-
-        // ****
-        final 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment 
missingProcessId = new 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
-            mkSet(
-                KafkaStreamsAssignment.of(clientUuid1, mkSet(
-                    new KafkaStreamsAssignment.AssignedTask(
-                        new TaskId(1, 1), 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
-                    )
-                ))
-            )
-        );
-        error = TaskAssignmentUtils.validateTaskAssignment(applicationState, 
missingProcessId);
-        
assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.MISSING_PROCESS_ID,
 error);
-
-        // ****
-        final 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment 
unknownProcessId = new 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
-            mkSet(
-                KafkaStreamsAssignment.of(clientUuid1, mkSet(
-                    new KafkaStreamsAssignment.AssignedTask(
-                        new TaskId(1, 1), 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
-                    )
-                )),
-                KafkaStreamsAssignment.of(clientUuid2, mkSet()),
-                KafkaStreamsAssignment.of(new ProcessId(UUID.randomUUID()), 
mkSet())
-            )
-        );
-        error = TaskAssignmentUtils.validateTaskAssignment(applicationState, 
unknownProcessId);
-        
assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.UNKNOWN_PROCESS_ID,
 error);
-
-        // ****
-        final 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment 
unknownTaskId = new 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
-            mkSet(
-                KafkaStreamsAssignment.of(clientUuid1, mkSet(
-                    new KafkaStreamsAssignment.AssignedTask(
-                        new TaskId(1, 1), 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
-                    )
-                )),
-                KafkaStreamsAssignment.of(clientUuid2, mkSet(
-                    new KafkaStreamsAssignment.AssignedTask(
-                        new TaskId(13, 13), 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
-                    )
-                ))
-            )
-        );
-        error = TaskAssignmentUtils.validateTaskAssignment(applicationState, 
unknownTaskId);
-        
assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.UNKNOWN_TASK_ID,
 error);
-
-        // ****
-        final 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment 
activeTaskDuplicated = new 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
-            mkSet(
-                KafkaStreamsAssignment.of(clientUuid1, mkSet(
-                    new KafkaStreamsAssignment.AssignedTask(
-                        new TaskId(1, 1), 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
-                    )
-                )),
-                KafkaStreamsAssignment.of(clientUuid2, mkSet(
-                    new KafkaStreamsAssignment.AssignedTask(
-                        new TaskId(1, 1), 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
-                    )
-                ))
-            )
-        );
-        error = TaskAssignmentUtils.validateTaskAssignment(applicationState, 
activeTaskDuplicated);
-        
assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,
 error);
-    }
-
     private static class CorruptedInternalTopologyBuilder extends 
InternalTopologyBuilder {
         private Map<Subtopology, TopicsInfo> corruptedTopicGroups;
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index c6f568466e5..22fd4052b34 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.UUID;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.DeleteRecordsResult;
 import org.apache.kafka.clients.admin.DeletedRecords;
@@ -240,7 +239,7 @@ public class TaskManagerTest {
         final TaskManager taskManager = new TaskManager(
             time,
             changeLogReader,
-            new ProcessId(UUID.randomUUID()),
+            ProcessId.randomProcessId(),
             "taskManagerTest",
             activeTaskCreator,
             standbyTaskCreator,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsAssignmentTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsAssignmentTest.java
new file mode 100644
index 00000000000..ef595d142b6
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsAssignmentTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.processIdForInt;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
+import 
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask;
+import org.junit.Test;
+
+public class KafkaStreamsAssignmentTest {
+    @Test
+    public void shouldHaveReadableString() {
+        final KafkaStreamsAssignment assignment = KafkaStreamsAssignment.of(
+            processIdForInt(1),
+            mkSet(
+                new AssignedTask(TASK_0_0, AssignedTask.Type.ACTIVE),
+                new AssignedTask(TASK_0_1, AssignedTask.Type.STANDBY),
+                new AssignedTask(TASK_0_2, AssignedTask.Type.ACTIVE)
+            )
+        );
+
+        assertThat(
+            assignment.toString(),
+            
equalTo("KafkaStreamsAssignment{00000000-0000-0000-0000-000000000001, "
+                    + "[AssignedTask{ACTIVE, 0_2}, AssignedTask{STANDBY, 0_1}, 
AssignedTask{ACTIVE, 0_0}], "
+                    + "Optional.empty}"));
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateTest.java
index a0b0c457a3b..b28cd3678a1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateTest.java
@@ -31,7 +31,6 @@ import java.util.Arrays;
 import java.util.Optional;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.UUID;
 import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
 import org.apache.kafka.streams.processor.assignment.ProcessId;
 import org.junit.Test;
@@ -40,7 +39,7 @@ public class KafkaStreamsStateTest {
     @Test
     public void shouldCorrectlyReturnTasksByLag() {
         final KafkaStreamsState state = new DefaultKafkaStreamsState(
-            new ProcessId(UUID.randomUUID()),
+            ProcessId.randomProcessId(),
             10,
             mkMap(),
             mkSortedSet(NAMED_TASK_T0_0_0, NAMED_TASK_T0_0_1),
@@ -71,7 +70,7 @@ public class KafkaStreamsStateTest {
     @Test
     public void shouldThrowExceptionOnLagOperationsIfLagsWereNotComputed() {
         final KafkaStreamsState state = new DefaultKafkaStreamsState(
-            new ProcessId(UUID.randomUUID()),
+            ProcessId.randomProcessId(),
             10,
             mkMap(),
             mkSortedSet(NAMED_TASK_T0_0_0, NAMED_TASK_T0_0_1),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
index 8eff5812284..2295a865aa7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
@@ -25,10 +25,12 @@ import static 
org.apache.kafka.streams.processor.internals.assignment.Assignment
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_3;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_4;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_5;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.processIdForInt;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -50,11 +52,12 @@ import 
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.Assi
 import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
 import org.apache.kafka.streams.processor.assignment.ProcessId;
 import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
+import org.apache.kafka.streams.processor.assignment.TaskAssignor;
 import 
org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils.RackAwareOptimizationParams;
 import org.apache.kafka.streams.processor.assignment.TaskInfo;
 import org.apache.kafka.streams.processor.assignment.TaskTopicPartition;
 import org.junit.Rule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.junit.rules.Timeout;
@@ -208,6 +211,222 @@ public class TaskAssignmentUtilsTest {
         assertThat(assignments.get(processId(5)).tasks().keySet(), 
equalTo(mkSet(TASK_0_0)));
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {
+        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
+        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY,
+    })
+    public void 
shouldNotViolateClientTagsAssignmentDuringStandbyOptimization(final String 
strategy) {
+        final AssignmentConfigs assignmentConfigs = defaultAssignmentConfigs(
+            strategy, 100, 1, 2, Collections.singletonList("az"));
+        final Map<TaskId, TaskInfo> tasks = mkMap(
+            mkTaskInfo(TASK_0_0, true, mkSet("r1")),
+            mkTaskInfo(TASK_0_1, true, mkSet("r1"))
+        );
+        final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = mkMap(
+            mkStreamState(1, 2, Optional.of("r1"), mkSet(), mkSet(), mkMap(
+                mkEntry("az", "1")
+            )),
+            mkStreamState(2, 2, Optional.of("r1"), mkSet(), mkSet(), mkMap(
+                mkEntry("az", "2")
+            )),
+            mkStreamState(3, 2, Optional.of("r1"), mkSet(), mkSet(), mkMap(
+                mkEntry("az", "3")
+            )),
+            mkStreamState(4, 2, Optional.of("r1"), mkSet(), mkSet(), mkMap(
+                mkEntry("az", "2")
+            ))
+        );
+        final ApplicationState applicationState = new TestApplicationState(
+            assignmentConfigs, kafkaStreamsStates, tasks);
+
+        final Map<ProcessId, KafkaStreamsAssignment> assignments = mkMap(
+            mkAssignment(
+                1,
+                new AssignedTask(TASK_0_0, AssignedTask.Type.ACTIVE),
+                new AssignedTask(TASK_0_1, AssignedTask.Type.STANDBY)
+            ),
+            mkAssignment(
+                2,
+                new AssignedTask(TASK_0_0, AssignedTask.Type.STANDBY),
+                new AssignedTask(TASK_0_1, AssignedTask.Type.ACTIVE)
+            ),
+            mkAssignment(
+                3,
+                new AssignedTask(TASK_0_0, AssignedTask.Type.STANDBY),
+                new AssignedTask(TASK_0_1, AssignedTask.Type.STANDBY)
+            ),
+            mkAssignment(4)
+        );
+
+        
TaskAssignmentUtils.optimizeRackAwareStandbyTasks(RackAwareOptimizationParams.of(applicationState),
 assignments);
+        assertThat(assignments.size(), equalTo(4));
+        assertThat(assignments.get(processId(1)).tasks().keySet(), 
equalTo(mkSet(TASK_0_0, TASK_0_1)));
+        assertThat(assignments.get(processId(2)).tasks().keySet(), 
equalTo(mkSet(TASK_0_0, TASK_0_1)));
+        assertThat(assignments.get(processId(3)).tasks().keySet(), 
equalTo(mkSet(TASK_0_0, TASK_0_1)));
+        assertThat(assignments.get(processId(4)).tasks().keySet(), 
equalTo(mkSet()));
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {
+        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
+        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY,
+    })
+    public void shouldOptimizeStandbyTasksWithMultipleRacks(final String 
strategy) {
+        final AssignmentConfigs assignmentConfigs = defaultAssignmentConfigs(
+            strategy, 100, 1, 1, Collections.emptyList());
+        final Map<TaskId, TaskInfo> tasks = mkMap(
+            mkTaskInfo(TASK_0_0, true, mkSet("rack-1", "rack-2")),
+            mkTaskInfo(TASK_0_1, true, mkSet("rack-2", "rack-3")),
+            mkTaskInfo(TASK_0_2, true, mkSet("rack-3", "rack-4"))
+        );
+        final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = mkMap(
+            mkStreamState(1, 2, Optional.of("rack-1")),
+            mkStreamState(2, 2, Optional.of("rack-2")),
+            mkStreamState(3, 2, Optional.of("rack-3"))
+        );
+        final ApplicationState applicationState = new TestApplicationState(
+            assignmentConfigs, kafkaStreamsStates, tasks);
+
+        final Map<ProcessId, KafkaStreamsAssignment> assignments = mkMap(
+            mkAssignment(AssignedTask.Type.ACTIVE, 1, TASK_0_0),
+            mkAssignment(AssignedTask.Type.ACTIVE, 2, TASK_0_1),
+            mkAssignment(AssignedTask.Type.ACTIVE, 3, TASK_0_2)
+        );
+
+        TaskAssignmentUtils.optimizeRackAwareActiveTasks(
+            RackAwareOptimizationParams.of(applicationState)
+                .forTasks(new TreeSet<>(mkSet(TASK_0_0, TASK_0_1, TASK_0_2))),
+            assignments
+        );
+        assertThat(assignments.size(), equalTo(3));
+        assertThat(assignments.get(processId(1)).tasks().keySet(), 
equalTo(mkSet(TASK_0_0)));
+        assertThat(assignments.get(processId(2)).tasks().keySet(), 
equalTo(mkSet(TASK_0_1)));
+        assertThat(assignments.get(processId(3)).tasks().keySet(), 
equalTo(mkSet(TASK_0_2)));
+    }
+
+    @Test
+    public void shouldCorrectlyReturnIdentityAssignment() {
+        final AssignmentConfigs assignmentConfigs = defaultAssignmentConfigs(
+            StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, 100, 1, 1, 
Collections.emptyList());
+        final Map<TaskId, TaskInfo> tasks = mkMap(
+            mkTaskInfo(TASK_0_0, true),
+            mkTaskInfo(TASK_0_1, true),
+            mkTaskInfo(TASK_0_2, true)
+        );
+        final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = mkMap(
+            mkStreamState(1, 5, Optional.empty(), mkSet(TASK_0_0, TASK_0_1, 
TASK_0_2), mkSet()),
+            mkStreamState(2, 5, Optional.empty(), mkSet(), mkSet(TASK_0_0, 
TASK_0_1, TASK_0_2)),
+            mkStreamState(3, 5, Optional.empty(), mkSet(), mkSet()),
+            mkStreamState(4, 5, Optional.empty(), mkSet(), mkSet()),
+            mkStreamState(5, 5, Optional.empty(), mkSet(), mkSet())
+        );
+        final ApplicationState applicationState = new TestApplicationState(
+            assignmentConfigs, kafkaStreamsStates, tasks);
+
+
+        final Map<ProcessId, KafkaStreamsAssignment> assignments = 
TaskAssignmentUtils.identityAssignment(applicationState);
+        assertThat(assignments.size(), equalTo(5));
+        assertThat(assignments.get(processId(1)).tasks().keySet(), 
equalTo(mkSet(TASK_0_0, TASK_0_1, TASK_0_2)));
+        assertThat(assignments.get(processId(2)).tasks().keySet(), 
equalTo(mkSet(TASK_0_0, TASK_0_1, TASK_0_2)));
+        assertThat(assignments.get(processId(3)).tasks().keySet(), 
equalTo(mkSet()));
+        assertThat(assignments.get(processId(4)).tasks().keySet(), 
equalTo(mkSet()));
+        assertThat(assignments.get(processId(5)).tasks().keySet(), 
equalTo(mkSet()));
+    }
+
+    @Test
+    public void testValidateTaskAssignment() {
+        final AssignmentConfigs assignmentConfigs = defaultAssignmentConfigs(
+            StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, 100, 1, 1, 
Collections.emptyList());
+        final Map<TaskId, TaskInfo> tasks = mkMap(
+            mkTaskInfo(TASK_1_1, false)
+        );
+        final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = mkMap(
+            mkStreamState(1, 5, Optional.empty()),
+            mkStreamState(2, 5, Optional.empty())
+        );
+        final ApplicationState applicationState = new TestApplicationState(
+            assignmentConfigs, kafkaStreamsStates, tasks);
+
+        // ****
+        final 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment 
noError = new 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
+            mkSet(
+                KafkaStreamsAssignment.of(processId(1), mkSet(
+                    new KafkaStreamsAssignment.AssignedTask(
+                        new TaskId(1, 1), 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
+                    )
+                )),
+                KafkaStreamsAssignment.of(processId(2), mkSet())
+            )
+        );
+        
org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError 
error = TaskAssignmentUtils.validateTaskAssignment(applicationState, noError);
+        assertThat(error, equalTo(TaskAssignor.AssignmentError.NONE));
+
+        // ****
+        final 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment 
missingProcessId = new 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
+            mkSet(
+                KafkaStreamsAssignment.of(processId(1), mkSet(
+                    new KafkaStreamsAssignment.AssignedTask(
+                        new TaskId(1, 1), 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
+                    )
+                ))
+            )
+        );
+        error = TaskAssignmentUtils.validateTaskAssignment(applicationState, 
missingProcessId);
+        assertThat(error, 
equalTo(TaskAssignor.AssignmentError.MISSING_PROCESS_ID));
+
+        // ****
+        final 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment 
unknownProcessId = new 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
+            mkSet(
+                KafkaStreamsAssignment.of(processId(1), mkSet(
+                    new KafkaStreamsAssignment.AssignedTask(
+                        new TaskId(1, 1), 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
+                    )
+                )),
+                KafkaStreamsAssignment.of(processId(2), mkSet()),
+                KafkaStreamsAssignment.of(ProcessId.randomProcessId(), mkSet())
+            )
+        );
+        error = TaskAssignmentUtils.validateTaskAssignment(applicationState, 
unknownProcessId);
+        assertThat(error, 
equalTo(TaskAssignor.AssignmentError.UNKNOWN_PROCESS_ID));
+
+        // ****
+        final 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment 
unknownTaskId = new 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
+            mkSet(
+                KafkaStreamsAssignment.of(processId(1), mkSet(
+                    new KafkaStreamsAssignment.AssignedTask(
+                        new TaskId(1, 1), 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
+                    )
+                )),
+                KafkaStreamsAssignment.of(processId(2), mkSet(
+                    new KafkaStreamsAssignment.AssignedTask(
+                        new TaskId(13, 13), 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
+                    )
+                ))
+            )
+        );
+        error = TaskAssignmentUtils.validateTaskAssignment(applicationState, 
unknownTaskId);
+        assertThat(error, 
equalTo(TaskAssignor.AssignmentError.UNKNOWN_TASK_ID));
+
+        // ****
+        final 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment 
activeTaskDuplicated = new 
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
+            mkSet(
+                KafkaStreamsAssignment.of(processId(1), mkSet(
+                    new KafkaStreamsAssignment.AssignedTask(
+                        new TaskId(1, 1), 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
+                    )
+                )),
+                KafkaStreamsAssignment.of(processId(2), mkSet(
+                    new KafkaStreamsAssignment.AssignedTask(
+                        new TaskId(1, 1), 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
+                    )
+                ))
+            )
+        );
+        error = TaskAssignmentUtils.validateTaskAssignment(applicationState, 
activeTaskDuplicated);
+        assertThat(error, 
equalTo(TaskAssignor.AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES));
+    }
+
     public static class TestApplicationState implements ApplicationState {
 
         private final AssignmentConfigs assignmentConfigs;
@@ -293,6 +512,18 @@ public class TaskAssignmentUtilsTest {
         );
     }
 
+    public static Map.Entry<ProcessId, KafkaStreamsAssignment> 
mkAssignment(final int client,
+                                                                            
final AssignedTask... tasks) {
+        final ProcessId processId = processId(client);
+        return mkEntry(
+            processId,
+            KafkaStreamsAssignment.of(
+                processId,
+                Arrays.stream(tasks).collect(Collectors.toSet())
+            )
+        );
+    }
+
     public static Map.Entry<TaskId, TaskInfo> mkTaskInfo(final TaskId taskId, 
final boolean isStateful) {
         return mkTaskInfo(taskId, isStateful, null);
     }


Reply via email to