mjsax commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170677125


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java:
##########
@@ -97,6 +95,27 @@ public KTableKTableJoinMerger<K, VR> joinMerger() {
         return (KTableKTableJoinMerger<K, VR>) kChangeProcessorSupplier;
     }
 
+    @Override
+    public void enableVersionedSemantics(final boolean useVersionedSemantics, 
final String parentNodeName) {
+        enableVersionedSemantics(thisProcessorParameters(), 
useVersionedSemantics, parentNodeName);
+        enableVersionedSemantics(otherProcessorParameters(), 
useVersionedSemantics, parentNodeName);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void enableVersionedSemantics(final ProcessorParameters<K, ?, ?, 
?> processorParameters,
+                                          final boolean useVersionedSemantics,
+                                          final String parentNodeName) {
+        final ProcessorSupplier<K, ?, ?, ?> processorSupplier = 
processorParameters.processorSupplier();
+        if (!(processorSupplier instanceof KTableKTableAbstractJoin)) {
+            throw new IllegalStateException("Unexpected processor type for 
table-table join: " + processorSupplier.getClass().getName());
+        }
+        final KTableKTableAbstractJoin<K, ?, ?, ?> tableJoin = 
(KTableKTableAbstractJoin<K, ?, ?, ?>) processorSupplier;
+
+        if (parentNodeName.equals(tableJoin.joinThisParentNodeName())) {

Review Comment:
   Not sure if I understand this condition? Can you elaborate? It seems to be 
the only place when we sue the newly added `parentNodeName` -- why do we not 
use it elsewhere?



##########
streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java:
##########
@@ -215,14 +226,19 @@ public void testInnerWithVersionedStores() {
             null,
             null,
             null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", 
null,  15L)),
+            null,
+            null,
+            null,
+            null,
+            null,

Review Comment:
   It think there is one `null` line too many?



##########
streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java:
##########
@@ -446,14 +482,18 @@ public void testInnerWithRightVersionedOnly() throws 
Exception {
                 null,
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"E-d", null,  14L)),
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"F-d", null,  14L))
+                null,
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"E-e", null,  15L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
null, null,  14L)),
+                null,
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"F-e", null,  14L))

Review Comment:
   Should the last two result row flip: we first get `F-e` when we process left 
hand `F` and get nothing when we process right hand `f`?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -804,6 +806,7 @@ private <VO, VR> KTable<K, VR> doJoin(final KTable<K, VO> 
other,
         kTableKTableJoinNode.setOutputVersioned(isOutputVersioned);
 
         builder.addGraphNode(this.graphNode, kTableKTableJoinNode);
+        builder.addGraphNode(((KTableImpl<?, ?, ?>) other).graphNode, 
kTableKTableJoinNode);

Review Comment:
   Yeah. Seems to be incorrect, but did apparently not surface as a bug. Nice 
fix!



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -1098,7 +1101,7 @@ private <VR, KO, VO> KTable<K, VR> 
doJoinOnForeignKey(final KTable<KO, VO> forei
         //not be done needlessly.
         ((KTableImpl<?, ?, ?>) foreignKeyTable).enableSendingOldValues(true);
 
-        //Old values must be sent such that the 
ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the 
correct node.
+        //Old values must be sent such that the 
SubscriptionSendProcessorSupplier can propagate deletions to the correct node.

Review Comment:
   Thank you! -- Could we do a small follow up for 3.4 branch to get it 
backported?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to