mjsax commented on code in PR #14139:
URL: https://github.com/apache/kafka/pull/14139#discussion_r1284866806
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##########
@@ -43,21 +43,27 @@
public class HighAvailabilityTaskAssignor implements TaskAssignor {
private static final Logger log =
LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class);
+ private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10;
+ private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1;
+ private static final int DEFAULT_STATELESS_TRAFFIC_COST = 1;
+ private static final int DEFAULT_STATELESS_NON_OVERLAP_COST = 1;
Review Comment:
We have different costs for stataful vs stateless here, but the configs
added only allow to pass in one value each for traffic and non-overlap. How
does this fix together?
From the code below it seem, users only configure cost for stateful anyway,
but stateless cost is alway `1` (for this case, the variables should not be
called `DEFAULT` because they cannot be changed).
Also, if we have default value, should they not be set in `StreamsConfig`?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java:
##########
@@ -57,6 +57,7 @@ public StickyTaskAssignor() {
public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
+ final RackAwareTaskAssignor rackAwareTaskAssignor,
Review Comment:
Should we make this an `Optional` ?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##########
@@ -124,11 +132,20 @@ private static void assignActiveStatefulTasks(final
SortedMap<UUID, ClientState>
ClientState::assignActive,
(source, destination) -> true
);
+
+ if (rackAwareTaskAssignor != null &&
rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
+ final int trafficCost = configs.rackAwareAssignmentTrafficCost ==
null ?
Review Comment:
Why do we get `trafficCost` each time? Seems we should set it up in the
constructor just a single time (or implement `Configurable` interface)? (Same
for `nonOverlapCost`)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java:
##########
@@ -16,18 +16,29 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
+import java.util.Optional;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TaskId;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.apache.kafka.streams.processor.internals.InternalTopicManager;
+import
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
public interface TaskAssignor {
/**
* @return whether the generated assignment requires a followup probing
rebalance to satisfy all conditions
*/
- boolean assign(Map<UUID, ClientState> clients,
- Set<TaskId> allTaskIds,
- Set<TaskId> statefulTaskIds,
- AssignorConfiguration.AssignmentConfigs configs);
+ boolean assign(final Map<UUID, ClientState> clients,
+ final Set<TaskId> allTaskIds,
+ final Set<TaskId> statefulTaskIds,
+ final Cluster cluster,
+ final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+ final Map<TaskId, Set<TopicPartition>>
changelogPartitionsForTask,
+ final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+ final Map<UUID, Map<String, Optional<String>>>
racksForProcessConsumer,
+ final InternalTopicManager internalTopicManager,
Review Comment:
It's internal. I am fine anyway
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]