lihaosky commented on code in PR #14139:
URL: https://github.com/apache/kafka/pull/14139#discussion_r1282664432


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java:
##########
@@ -16,18 +16,29 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import java.util.Optional;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.TaskId;
 
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.kafka.streams.processor.internals.InternalTopicManager;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
 
 public interface TaskAssignor {
     /**
      * @return whether the generated assignment requires a followup probing 
rebalance to satisfy all conditions
      */
-    boolean assign(Map<UUID, ClientState> clients,
-                   Set<TaskId> allTaskIds,
-                   Set<TaskId> statefulTaskIds,
-                   AssignorConfiguration.AssignmentConfigs configs);
+    boolean assign(final Map<UUID, ClientState> clients,
+                   final Set<TaskId> allTaskIds,
+                   final Set<TaskId> statefulTaskIds,
+                   final Cluster cluster,
+                   final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                   final Map<TaskId, Set<TopicPartition>> 
changelogPartitionsForTask,
+                   final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                   final Map<UUID, Map<String, Optional<String>>> 
racksForProcessConsumer,
+                   final InternalTopicManager internalTopicManager,

Review Comment:
   I'm totally fine with creating `RackAwareTaskAssignor` outside and pass it 
to `TaskAssignor`. It definitely makes the interface cleaner. cc @mjsax 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -43,26 +51,39 @@
 import org.slf4j.LoggerFactory;
 
 public class RackAwareTaskAssignor {
+
+    @FunctionalInterface
+    public interface MoveStandbyTaskPredicate {
+        boolean canMove(final ClientState source,
+                        final ClientState destination,
+                        final TaskId taskId,
+                        final Map<UUID, ClientState> clientStateMap);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(RackAwareTaskAssignor.class);
 
     private static final int SOURCE_ID = -1;
 
     private final Cluster fullMetadata;
     private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask;
     private final AssignmentConfigs assignmentConfigs;
     private final Map<TopicPartition, Set<String>> racksForPartition;
     private final Map<UUID, String> racksForProcess;
     private final InternalTopicManager internalTopicManager;
     private final boolean validClientRack;
+    private Boolean canEnable = null;
 
     public RackAwareTaskAssignor(final Cluster fullMetadata,
                                  final Map<TaskId, Set<TopicPartition>> 
partitionsForTask,
+                                 final Map<TaskId, Set<TopicPartition>> 
changelogPartitionsForTask,
                                  final Map<Subtopology, Set<TaskId>> 
tasksForTopicGroup,

Review Comment:
   It should be used for 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-II.Mincostwithbalancedsub-topology.
 But I may not have time to implement this option for 3.6 release. So the 
current plan is to implement 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-I.Mincostwithunbalancedsub-topology
 for 3.6 release and add balanced sub-topology option later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to