guozhangwang commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r956662529


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +362,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final 
Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();
+        countSourceNodes(count, streamJoinNode, new HashSet<>());
+        if (count.get() > 1) {
+            return false;
+        }
+        if (streamJoinNode.parentNodes().size() > 1) {
+            return false;
+        }
+        for (final GraphNode parent: streamJoinNode.parentNodes()) {
+            for (final GraphNode sibling : parent.children()) {
+                if (sibling instanceof ProcessorGraphNode) {
+                    if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+                        continue;
+                    }
+                }
+                if (sibling != streamJoinNode
+                    && sibling.buildPriority() < 
streamJoinNode.buildPriority()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private void countSourceNodes(
+        final AtomicInteger count,
+        final GraphNode currentNode,
+        final Set<GraphNode> visited) {
+
+        if (currentNode instanceof StreamSourceNode) {
+            count.incrementAndGet();
+        }
+
+        for (final GraphNode parent: currentNode.parentNodes()) {
+            if (!visited.contains(parent)) {
+                visited.add(parent);
+                countSourceNodes(count, parent, visited);
+            }
+        }
+    }
+
+    private boolean isStreamJoinWindowNode(final ProcessorGraphNode node) {
+        return node.processorParameters() != null

Review Comment:
   Why we need the other checks except the last one? Since this is called after 
the whole pass the logical plan should be complete and hence all of the checked 
fields should be null (otherwise it should be a bug)?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -663,6 +663,12 @@ public class StreamsConfig extends AbstractConfig {
     public static final String TOPOLOGY_OPTIMIZATION_CONFIG = 
"topology.optimization";
     private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration 
telling Kafka Streams if it should optimize the topology, disabled by default";
 
+    public static final String SELF_JOIN_OPTIMIZATION_CONFIG = 
"self.join.optimization";

Review Comment:
   I still have some concerns about the extra configs pattern here, but I will 
leave for open discuss in the KIP thread.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(false, false);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(
+        final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
         mergeDuplicateSourceNodes();
         if (optimizeTopology) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
             optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
+            if (optimizeSelfJoin) {

Review Comment:
   This is a meta comment: we used a "two-pass" mechanism to apply optimization 
before since originally we did not pass in the `Configs` as part of the 
StreamsBuilder.build() process, and that two-pass mechanism has introduced a 
lot of issues and complicates (I think you've felt some at this time.. :P ) so 
the current plan is to get rid of the two-pass mechanism but try to optimize 
the topology in the first pass as we build the logical plan. So could you 
consider another approach than rewriting self joins if possible? In this way, 
we potentially would not need to consider awkward applicabilities like you have 
in 
https://github.com/apache/kafka/pull/12555/files#diff-ac1bf2b23b80784dec20b00fdc42f2df7e5a5133d6c68978fa44aea11e950c3aR398-R404
 below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to