mjsax commented on code in PR #13996:
URL: https://github.com/apache/kafka/pull/13996#discussion_r1261835084


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java:
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+public class Graph<V extends Comparable<V>> {
+    public class Edge implements Comparable<Edge> {
+        final V destination;
+        final int capacity;
+        final int cost;
+        int residualFlow;
+        int flow;
+        Edge counterEdge;
+
+        public Edge(final V destination, final int capacity, final int cost, 
final int residualFlow, final int flow) {
+            Objects.requireNonNull(destination);
+            this.destination = destination;
+            this.capacity = capacity;
+            this.cost = cost;
+            this.residualFlow = residualFlow;
+            this.flow = flow;
+        }
+
+        @Override
+        public int compareTo(final Edge o) {

Review Comment:
   `compareTo` is to establish an order, right? Why do we order by 
`(destination,capacity,cost)`; does is matter, or could we use any order as 
long as deterministic?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java:
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+public class Graph<V extends Comparable<V>> {
+    public class Edge implements Comparable<Edge> {
+        final V destination;
+        final int capacity;
+        final int cost;
+        int residualFlow;
+        int flow;
+        Edge counterEdge;
+
+        public Edge(final V destination, final int capacity, final int cost, 
final int residualFlow, final int flow) {
+            Objects.requireNonNull(destination);
+            this.destination = destination;
+            this.capacity = capacity;
+            this.cost = cost;
+            this.residualFlow = residualFlow;
+            this.flow = flow;
+        }
+
+        @Override
+        public int compareTo(final Edge o) {
+            int compare = destination.compareTo(o.destination);
+            if (compare != 0) {
+                return compare;
+            }
+
+            compare = capacity - o.capacity;
+            if (compare != 0) {
+                return compare;
+            }
+
+            return cost - o.cost;
+        }
+
+        @Override
+        public boolean equals(final Object other) {
+            if (this == other) {
+                return true;
+            }
+            if (other == null || other.getClass() != getClass()) {
+                return false;
+            }
+
+            final Graph<?>.Edge otherEdge = (Graph<?>.Edge) other;
+
+            return destination.equals(otherEdge.destination) && capacity == 
otherEdge.capacity
+                && cost == otherEdge.cost && residualFlow == 
otherEdge.residualFlow && flow == otherEdge.flow;

Review Comment:
   We include the mutable fields here, but omit them in `compareTo` -- is this 
on purpose? If yes, why?
   
   I know for sure that `equals` and `hashCode` must use the same fields, and 
that `if a.equals(b) => a.hashCode() == b.hashCode()` -- not sure if 
`compareTo` also needs to align similarly, and return `0` on `a.compare(b)` if 
`a.equals(b)` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java:
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+public class Graph<V extends Comparable<V>> {
+    public class Edge implements Comparable<Edge> {
+        final V destination;
+        final int capacity;
+        final int cost;
+        int residualFlow;
+        int flow;
+        Edge counterEdge;
+
+        public Edge(final V destination, final int capacity, final int cost, 
final int residualFlow, final int flow) {
+            Objects.requireNonNull(destination);
+            this.destination = destination;
+            this.capacity = capacity;
+            this.cost = cost;
+            this.residualFlow = residualFlow;
+            this.flow = flow;
+        }
+
+        @Override
+        public int compareTo(final Edge o) {
+            int compare = destination.compareTo(o.destination);
+            if (compare != 0) {
+                return compare;
+            }
+
+            compare = capacity - o.capacity;
+            if (compare != 0) {
+                return compare;
+            }
+
+            return cost - o.cost;
+        }
+
+        @Override
+        public boolean equals(final Object other) {
+            if (this == other) {
+                return true;
+            }
+            if (other == null || other.getClass() != getClass()) {
+                return false;
+            }
+
+            final Graph<?>.Edge otherEdge = (Graph<?>.Edge) other;
+
+            return destination.equals(otherEdge.destination) && capacity == 
otherEdge.capacity
+                && cost == otherEdge.cost && residualFlow == 
otherEdge.residualFlow && flow == otherEdge.flow;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(destination, capacity, cost, residualFlow, 
flow);
+        }
+
+        @Override
+        public String toString() {
+            return "{destination= " + destination + ", capacity=" + capacity + 
", cost=" + cost
+                + ", residualFlow=" + residualFlow + ", flow=" + flow;
+        }
+    }
+
+    private final SortedMap<V, SortedMap<V, Edge>> adjList = new TreeMap<>();
+    private final SortedSet<V> nodes = new TreeSet<>();
+    private final boolean isResidualGraph;
+    private V sourceNode, sinkNode;
+
+    public Graph() {
+        this(false);
+    }
+
+    private Graph(final boolean isResidualGraph) {
+        this.isResidualGraph = isResidualGraph;
+    }
+
+    public void addEdge(final V u, final V v, final int capacity, final int 
cost, final int flow) {
+        addEdge(u, new Edge(v, capacity, cost, capacity - flow, flow));
+    }
+
+    public Set<V> getNodes() {
+        return nodes;
+    }
+
+    public Map<V, Edge> getEdges(final V node) {
+        return adjList.get(node);
+    }
+
+    public boolean isResidualGraph() {
+        return isResidualGraph;
+    }
+
+    public void setSourceNode(final V node) {
+        sourceNode = node;
+    }
+
+    public void setSinkNode(final V node) {
+        sinkNode = node;
+    }
+
+    public int getTotalCost() {
+        int totalCost = 0;
+        for (final Map.Entry<V, SortedMap<V, Edge>> nodeEdges : 
adjList.entrySet()) {
+            final SortedMap<V, Edge> edges = nodeEdges.getValue();
+            for (final Entry<V, Edge> nodeEdge : edges.entrySet()) {
+                totalCost += nodeEdge.getValue().cost * 
nodeEdge.getValue().flow;
+            }
+        }
+        return totalCost;
+    }
+
+    private void addEdge(final V u, final Edge edge) {
+        if (edge.capacity < 0) {
+            throw new IllegalArgumentException("Edge capacity cannot be 
negative");

Review Comment:
   Should this rather go into the constructor of `Edge` class?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java:
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+public class Graph<V extends Comparable<V>> {
+    public class Edge implements Comparable<Edge> {
+        final V destination;
+        final int capacity;
+        final int cost;
+        int residualFlow;
+        int flow;
+        Edge counterEdge;
+
+        public Edge(final V destination, final int capacity, final int cost, 
final int residualFlow, final int flow) {
+            Objects.requireNonNull(destination);
+            this.destination = destination;
+            this.capacity = capacity;
+            this.cost = cost;
+            this.residualFlow = residualFlow;
+            this.flow = flow;
+        }
+
+        @Override
+        public int compareTo(final Edge o) {
+            int compare = destination.compareTo(o.destination);
+            if (compare != 0) {
+                return compare;
+            }
+
+            compare = capacity - o.capacity;
+            if (compare != 0) {
+                return compare;
+            }
+
+            return cost - o.cost;
+        }
+
+        @Override
+        public boolean equals(final Object other) {
+            if (this == other) {
+                return true;
+            }
+            if (other == null || other.getClass() != getClass()) {
+                return false;
+            }
+
+            final Graph<?>.Edge otherEdge = (Graph<?>.Edge) other;
+
+            return destination.equals(otherEdge.destination) && capacity == 
otherEdge.capacity
+                && cost == otherEdge.cost && residualFlow == 
otherEdge.residualFlow && flow == otherEdge.flow;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(destination, capacity, cost, residualFlow, 
flow);
+        }
+
+        @Override
+        public String toString() {
+            return "{destination= " + destination + ", capacity=" + capacity + 
", cost=" + cost
+                + ", residualFlow=" + residualFlow + ", flow=" + flow;
+        }
+    }
+
+    private final SortedMap<V, SortedMap<V, Edge>> adjList = new TreeMap<>();
+    private final SortedSet<V> nodes = new TreeSet<>();
+    private final boolean isResidualGraph;
+    private V sourceNode, sinkNode;
+
+    public Graph() {
+        this(false);
+    }
+
+    private Graph(final boolean isResidualGraph) {
+        this.isResidualGraph = isResidualGraph;
+    }
+
+    public void addEdge(final V u, final V v, final int capacity, final int 
cost, final int flow) {
+        addEdge(u, new Edge(v, capacity, cost, capacity - flow, flow));
+    }
+
+    public Set<V> getNodes() {

Review Comment:
   In Kafka, it's common practice to omit the `get` on getter, so it should 
just be `nodes()` (similar for other methods)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java:
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+public class Graph<V extends Comparable<V>> {
+    public class Edge implements Comparable<Edge> {
+        final V destination;
+        final int capacity;
+        final int cost;
+        int residualFlow;
+        int flow;
+        Edge counterEdge;
+
+        public Edge(final V destination, final int capacity, final int cost, 
final int residualFlow, final int flow) {
+            Objects.requireNonNull(destination);
+            this.destination = destination;
+            this.capacity = capacity;
+            this.cost = cost;
+            this.residualFlow = residualFlow;
+            this.flow = flow;
+        }
+
+        @Override
+        public int compareTo(final Edge o) {
+            int compare = destination.compareTo(o.destination);
+            if (compare != 0) {
+                return compare;
+            }
+
+            compare = capacity - o.capacity;
+            if (compare != 0) {
+                return compare;
+            }
+
+            return cost - o.cost;
+        }
+
+        @Override
+        public boolean equals(final Object other) {
+            if (this == other) {
+                return true;
+            }
+            if (other == null || other.getClass() != getClass()) {
+                return false;
+            }
+
+            final Graph<?>.Edge otherEdge = (Graph<?>.Edge) other;
+
+            return destination.equals(otherEdge.destination) && capacity == 
otherEdge.capacity
+                && cost == otherEdge.cost && residualFlow == 
otherEdge.residualFlow && flow == otherEdge.flow;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(destination, capacity, cost, residualFlow, 
flow);
+        }
+
+        @Override
+        public String toString() {
+            return "{destination= " + destination + ", capacity=" + capacity + 
", cost=" + cost
+                + ", residualFlow=" + residualFlow + ", flow=" + flow;
+        }
+    }
+
+    private final SortedMap<V, SortedMap<V, Edge>> adjList = new TreeMap<>();

Review Comment:
   Why do we use nested `SortedMap`? Is seems `SortedMap<V, List<Edge>` would 
be sufficient?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java:
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+public class Graph<V extends Comparable<V>> {
+    public class Edge implements Comparable<Edge> {

Review Comment:
   Why do we implement `Comparable`? Could not spot the reason why it's 
required.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java:
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+public class Graph<V extends Comparable<V>> {
+    public class Edge implements Comparable<Edge> {
+        final V destination;
+        final int capacity;
+        final int cost;
+        int residualFlow;
+        int flow;
+        Edge counterEdge;
+
+        public Edge(final V destination, final int capacity, final int cost, 
final int residualFlow, final int flow) {
+            Objects.requireNonNull(destination);
+            this.destination = destination;
+            this.capacity = capacity;
+            this.cost = cost;
+            this.residualFlow = residualFlow;
+            this.flow = flow;
+        }
+
+        @Override
+        public int compareTo(final Edge o) {
+            int compare = destination.compareTo(o.destination);
+            if (compare != 0) {
+                return compare;
+            }
+
+            compare = capacity - o.capacity;
+            if (compare != 0) {
+                return compare;
+            }
+
+            return cost - o.cost;
+        }
+
+        @Override
+        public boolean equals(final Object other) {
+            if (this == other) {
+                return true;
+            }
+            if (other == null || other.getClass() != getClass()) {
+                return false;
+            }
+
+            final Graph<?>.Edge otherEdge = (Graph<?>.Edge) other;
+
+            return destination.equals(otherEdge.destination) && capacity == 
otherEdge.capacity
+                && cost == otherEdge.cost && residualFlow == 
otherEdge.residualFlow && flow == otherEdge.flow;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(destination, capacity, cost, residualFlow, 
flow);
+        }
+
+        @Override
+        public String toString() {
+            return "{destination= " + destination + ", capacity=" + capacity + 
", cost=" + cost
+                + ", residualFlow=" + residualFlow + ", flow=" + flow;
+        }
+    }
+
+    private final SortedMap<V, SortedMap<V, Edge>> adjList = new TreeMap<>();
+    private final SortedSet<V> nodes = new TreeSet<>();
+    private final boolean isResidualGraph;
+    private V sourceNode, sinkNode;
+
+    public Graph() {
+        this(false);
+    }
+
+    private Graph(final boolean isResidualGraph) {
+        this.isResidualGraph = isResidualGraph;
+    }
+
+    public void addEdge(final V u, final V v, final int capacity, final int 
cost, final int flow) {
+        addEdge(u, new Edge(v, capacity, cost, capacity - flow, flow));
+    }
+
+    public Set<V> getNodes() {
+        return nodes;
+    }
+
+    public Map<V, Edge> getEdges(final V node) {
+        return adjList.get(node);
+    }
+
+    public boolean isResidualGraph() {
+        return isResidualGraph;
+    }
+
+    public void setSourceNode(final V node) {
+        sourceNode = node;
+    }
+
+    public void setSinkNode(final V node) {
+        sinkNode = node;
+    }
+
+    public int getTotalCost() {
+        int totalCost = 0;
+        for (final Map.Entry<V, SortedMap<V, Edge>> nodeEdges : 
adjList.entrySet()) {
+            final SortedMap<V, Edge> edges = nodeEdges.getValue();
+            for (final Entry<V, Edge> nodeEdge : edges.entrySet()) {
+                totalCost += nodeEdge.getValue().cost * 
nodeEdge.getValue().flow;
+            }
+        }
+        return totalCost;
+    }
+
+    private void addEdge(final V u, final Edge edge) {
+        if (edge.capacity < 0) {
+            throw new IllegalArgumentException("Edge capacity cannot be 
negative");
+        }
+        if (edge.flow > edge.capacity) {
+            throw new IllegalArgumentException(String.format("Edge flow %d 
cannot exceed capacity %d",

Review Comment:
   Same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to