guozhangwang commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r956662529
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +362,95 @@ private void mergeDuplicateSourceNodes() {
}
}
+ /**
+ * If the join is a self-join, remove the node KStreamJoinWindow
corresponding to the
+ */
+ @SuppressWarnings("unchecked")
+ private void rewriteSelfJoin(final GraphNode currentNode, final
Set<GraphNode> visited) {
+ visited.add(currentNode);
+ if (currentNode instanceof StreamStreamJoinNode &&
isSelfJoin(currentNode)) {
+ ((StreamStreamJoinNode) currentNode).setSelfJoin();
+ // Remove JoinOtherWindowed node
+ final GraphNode parent =
currentNode.parentNodes().stream().findFirst().get();
+ GraphNode left = null, right = null;
+ for (final GraphNode child: parent.children()) {
+ if (child instanceof ProcessorGraphNode
+ && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+ if (left == null) {
+ left = child;
+ } else {
+ right = child;
+ }
+ }
+ }
+ // Sanity check
+ if (left != null && right != null && left.buildPriority() <
right.buildPriority()) {
+ parent.removeChild(right);
+ }
+ }
+ for (final GraphNode child: currentNode.children()) {
+ if (!visited.contains(child)) {
+ rewriteSelfJoin(child, visited);
+ }
+ }
+ }
+
+ /**
+ * The self-join rewriting can be applied if:
+ * 1. The path from the StreamStreamJoinNode to the root contains a single
source node.
+ * 2. The StreamStreamJoinNode has a single parent.
+ * 3. There are no other nodes besides the KStreamJoinWindow that are
siblings of the
+ * StreamStreamJoinNode and have smaller build priority.
+ */
+ private boolean isSelfJoin(final GraphNode streamJoinNode) {
+ final AtomicInteger count = new AtomicInteger();
+ countSourceNodes(count, streamJoinNode, new HashSet<>());
+ if (count.get() > 1) {
+ return false;
+ }
+ if (streamJoinNode.parentNodes().size() > 1) {
+ return false;
+ }
+ for (final GraphNode parent: streamJoinNode.parentNodes()) {
+ for (final GraphNode sibling : parent.children()) {
+ if (sibling instanceof ProcessorGraphNode) {
+ if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+ continue;
+ }
+ }
+ if (sibling != streamJoinNode
+ && sibling.buildPriority() <
streamJoinNode.buildPriority()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private void countSourceNodes(
+ final AtomicInteger count,
+ final GraphNode currentNode,
+ final Set<GraphNode> visited) {
+
+ if (currentNode instanceof StreamSourceNode) {
+ count.incrementAndGet();
+ }
+
+ for (final GraphNode parent: currentNode.parentNodes()) {
+ if (!visited.contains(parent)) {
+ visited.add(parent);
+ countSourceNodes(count, parent, visited);
+ }
+ }
+ }
+
+ private boolean isStreamJoinWindowNode(final ProcessorGraphNode node) {
+ return node.processorParameters() != null
Review Comment:
Why we need the other checks except the last one? Since this is called after
the whole pass the logical plan should be complete and hence all of the checked
fields should be null (otherwise it should be a bug)?
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -663,6 +663,12 @@ public class StreamsConfig extends AbstractConfig {
public static final String TOPOLOGY_OPTIMIZATION_CONFIG =
"topology.optimization";
private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration
telling Kafka Streams if it should optimize the topology, disabled by default";
+ public static final String SELF_JOIN_OPTIMIZATION_CONFIG =
"self.join.optimization";
Review Comment:
I still have some concerns about the extra configs pattern here, but I will
leave for open discuss in the KIP thread.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final
GraphNode node) {
// use this method for testing only
public void buildAndOptimizeTopology() {
- buildAndOptimizeTopology(false);
+ buildAndOptimizeTopology(false, false);
}
- public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+ public void buildAndOptimizeTopology(
+ final boolean optimizeTopology, final boolean optimizeSelfJoin) {
mergeDuplicateSourceNodes();
if (optimizeTopology) {
LOG.debug("Optimizing the Kafka Streams graph for repartition
nodes");
optimizeKTableSourceTopics();
maybeOptimizeRepartitionOperations();
+ if (optimizeSelfJoin) {
Review Comment:
This is a meta comment: we used a "two-pass" mechanism to apply optimization
before since originally we did not pass in the `Configs` as part of the
StreamsBuilder.build() process, and that two-pass mechanism has introduced a
lot of issues and complicates (I think you've felt some at this time.. :P ) so
the current plan is to get rid of the two-pass mechanism but try to optimize
the topology in the first pass as we build the logical plan. So could you
consider another approach than rewriting self joins if possible? In this way,
we potentially would not need to consider awkward applicabilities like you have
in
https://github.com/apache/kafka/pull/12555/files#diff-ac1bf2b23b80784dec20b00fdc42f2df7e5a5133d6c68978fa44aea11e950c3aR398-R404
below.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]