This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c7d3999d4f5 KAFKA-18026: KIP-1112, clean up graph node grace period 
resolution (#18342)
c7d3999d4f5 is described below

commit c7d3999d4f5f6cbe7bb2be60155ee6e607a7152d
Author: A. Sophie Blee-Goldman <ableegold...@gmail.com>
AuthorDate: Wed Jan 15 18:01:19 2025 -0800

    KAFKA-18026: KIP-1112, clean up graph node grace period resolution (#18342)
    
    Minor followup to #18195 that I split out into a separate PR since that one 
was getting a bit long. Should be rebased & reviewed after that one is merged.
    
    Introduces a new class for windowed graph nodes with a grace period defined 
to improve (slightly) the type safety
    
    Reviewers: Guozhang Wang <guozhang.wang...@gmail.com>, Almog Gavra 
<al...@responsive.dev>
---
 .../internals/CogroupedStreamAggregateBuilder.java | 23 ++++---
 .../internals/GroupedStreamAggregateBuilder.java   | 76 ++++++++++++++++------
 .../kstream/internals/KGroupedStreamImpl.java      |  4 +-
 .../streams/kstream/internals/KStreamImpl.java     |  3 +-
 .../internals/SessionWindowedKStreamImpl.java      | 18 +++--
 .../internals/SlidingWindowedKStreamImpl.java      | 15 +++--
 .../kstream/internals/TimeWindowedKStreamImpl.java | 15 +++--
 .../internals/graph/GracePeriodGraphNode.java      | 38 +++++++++++
 .../internals/graph/GraphGraceSearchUtil.java      | 38 +----------
 .../kstream/internals/SuppressScenarioTest.java    | 28 +++++++-
 .../internals/graph/GraphGraceSearchUtilTest.java  | 44 ++++++++-----
 11 files changed, 197 insertions(+), 105 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
index 1ce1e7451a7..a450f8ead1a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.kstream.internals.graph.GracePeriodGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
 import 
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
@@ -110,10 +111,11 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                 "-cogroup-agg-" + counter++,
                 builder,
                 CogroupedKStreamImpl.AGGREGATE_NAME);
-            final ProcessorGraphNode<K, ?> aggProcessorNode =
-                new ProcessorGraphNode<>(
+            final GracePeriodGraphNode<K, ?> aggProcessorNode =
+                new GracePeriodGraphNode<>(
                     kStreamAggProcessorName,
-                    new ProcessorParameters<>(parentProcessor, 
kStreamAggProcessorName)
+                    new ProcessorParameters<>(parentProcessor, 
kStreamAggProcessorName),
+                    windows.gracePeriodMs()
                 );
             processors.add(aggProcessorNode);
             builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
aggProcessorNode);
@@ -149,10 +151,12 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                 "-cogroup-agg-" + counter++,
                 builder,
                 CogroupedKStreamImpl.AGGREGATE_NAME);
-            final ProcessorGraphNode<K, ?> aggProcessorNode =
-                new ProcessorGraphNode<>(
+            final long gracePeriod = sessionWindows.gracePeriodMs() + 
sessionWindows.inactivityGap();
+            final GracePeriodGraphNode<K, ?> aggProcessorNode =
+                new GracePeriodGraphNode<>(
                     kStreamAggProcessorName,
-                    new ProcessorParameters<>(parentProcessor, 
kStreamAggProcessorName)
+                    new ProcessorParameters<>(parentProcessor, 
kStreamAggProcessorName),
+                    gracePeriod
                 );
             processors.add(aggProcessorNode);
             builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
aggProcessorNode);
@@ -187,10 +191,11 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                 "-cogroup-agg-" + counter++,
                 builder,
                 CogroupedKStreamImpl.AGGREGATE_NAME);
-            final ProcessorGraphNode<K, ?> aggProcessorNode =
-                new ProcessorGraphNode<>(
+            final GracePeriodGraphNode<K, ?> aggProcessorNode =
+                new GracePeriodGraphNode<>(
                     kStreamAggProcessorName,
-                    new ProcessorParameters<>(parentProcessor, 
kStreamAggProcessorName)
+                    new ProcessorParameters<>(parentProcessor, 
kStreamAggProcessorName),
+                    slidingWindows.gracePeriodMs()
                 );
             processors.add(aggProcessorNode);
             builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
aggProcessorNode);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index bc84e69a1ff..b99034c5306 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -20,10 +20,10 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.internals.graph.GracePeriodGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
 
 import java.util.Collections;
 import java.util.Set;
@@ -66,23 +66,67 @@ class GroupedStreamAggregateBuilder<K, V> {
         this.userProvidedRepartitionTopicName = groupedInternal.name();
     }
 
-    <KR, VR> KTable<KR, VR> build(final NamedInternal functionName,
-                                  final StoreFactory storeFactory,
-                                  final KStreamAggProcessorSupplier<K, V, KR, 
VR> aggregateSupplier,
-                                  final String queryableStoreName,
-                                  final Serde<KR> keySerde,
-                                  final Serde<VR> valueSerde,
-                                  final boolean isOutputVersioned) {
-        assert queryableStoreName == null || 
queryableStoreName.equals(storeFactory.storeName());
+    <KR, VR> KTable<KR, VR> buildNonWindowed(final NamedInternal functionName,
+                                             final String storeName,
+                                             final 
KStreamAggProcessorSupplier<K, V, KR, VR> aggregateSupplier,
+                                             final String queryableStoreName,
+                                             final Serde<KR> keySerde,
+                                             final Serde<VR> valueSerde,
+                                             final boolean isOutputVersioned) {
+        final String aggFunctionName = functionName.name();
+
+        final ProcessorGraphNode<K, V> aggProcessorNode =
+            new ProcessorGraphNode<>(
+                aggFunctionName,
+                new ProcessorParameters<>(aggregateSupplier, aggFunctionName)
+            );
 
+        aggProcessorNode.setOutputVersioned(isOutputVersioned);
+
+        return build(aggFunctionName, storeName, aggregateSupplier, 
aggProcessorNode, queryableStoreName, keySerde, valueSerde);
+    }
+
+    <KR, VR> KTable<KR, VR> buildWindowed(final NamedInternal functionName,
+                                          final String storeName,
+                                          final long gracePeriod,
+                                          final KStreamAggProcessorSupplier<K, 
V, KR, VR> aggregateSupplier,
+                                          final String queryableStoreName,
+                                          final Serde<KR> keySerde,
+                                          final Serde<VR> valueSerde,
+                                          final boolean isOutputVersioned) {
         final String aggFunctionName = functionName.name();
 
+        final GracePeriodGraphNode<K, V> gracePeriodAggProcessorNode =
+            new GracePeriodGraphNode<>(
+                aggFunctionName,
+                new ProcessorParameters<>(aggregateSupplier, aggFunctionName),
+                gracePeriod
+            );
+
+        gracePeriodAggProcessorNode.setOutputVersioned(isOutputVersioned);
+
+        return build(aggFunctionName, storeName, aggregateSupplier, 
gracePeriodAggProcessorNode, queryableStoreName, keySerde, valueSerde);
+    }
+
+    private <KR, VR> KTable<KR, VR> build(final String aggFunctionName,
+                                          final String storeName,
+                                          final KStreamAggProcessorSupplier<K, 
V, KR, VR> aggregateSupplier,
+                                          final ProcessorGraphNode<K, V> 
aggProcessorNode,
+                                          final String queryableStoreName,
+                                          final Serde<KR> keySerde,
+                                          final Serde<VR> valueSerde) {
+        if (!(queryableStoreName == null || 
queryableStoreName.equals(storeName))) {
+            throw new IllegalStateException(String.format("queryableStoreName 
should be null or equal to storeName"
+                                                              + " but got 
storeName='%s' and queryableStoreName='%s'",
+                                                          storeName, 
queryableStoreName));
+        }
+
         String sourceName = this.name;
         GraphNode parentNode = graphNode;
 
         if (repartitionRequired) {
             final OptimizableRepartitionNodeBuilder<K, V> 
repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
-            final String repartitionTopicPrefix = 
userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : 
storeFactory.storeName();
+            final String repartitionTopicPrefix = 
userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : 
storeName;
             sourceName = createRepartitionSource(repartitionTopicPrefix, 
repartitionNodeBuilder);
 
             // First time through we need to create a repartition node.
@@ -97,14 +141,7 @@ class GroupedStreamAggregateBuilder<K, V> {
             parentNode = repartitionNode;
         }
 
-        final ProcessorGraphNode<K, V> statefulProcessorNode =
-            new ProcessorGraphNode<>(
-                aggFunctionName,
-                new ProcessorParameters<>(aggregateSupplier, aggFunctionName)
-            );
-        statefulProcessorNode.setOutputVersioned(isOutputVersioned);
-
-        builder.addGraphNode(parentNode, statefulProcessorNode);
+        builder.addGraphNode(parentNode, aggProcessorNode);
 
         return new KTableImpl<>(aggFunctionName,
                                 keySerde,
@@ -112,9 +149,8 @@ class GroupedStreamAggregateBuilder<K, V> {
                                 sourceName.equals(this.name) ? 
subTopologySourceNodes : Collections.singleton(sourceName),
                                 queryableStoreName,
                                 aggregateSupplier,
-                                statefulProcessorNode,
+                                aggProcessorNode,
                                 builder);
-
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index cc335e1383d..73c6174b27b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -244,9 +244,9 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> 
implements KGroupedS
                                          final String functionName,
                                          final KeyValueStoreMaterializer<K, T> 
storeFactory) {
 
-        return aggregateBuilder.build(
+        return aggregateBuilder.buildNonWindowed(
             new NamedInternal(functionName),
-            storeFactory,
+            storeFactory.storeName(),
             aggregateSupplier,
             storeFactory.queryableStoreName(),
             storeFactory.keySerde(),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7beaa1abffb..94e0e9a0e36 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -1231,8 +1231,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         }
 
         final String name = new NamedInternal(named).name();
-        final ProcessorToStateConnectorNode<? super K, ? super V> processNode 
= new
-            ProcessorToStateConnectorNode<>(
+        final ProcessorToStateConnectorNode<? super K, ? super V> processNode 
= new ProcessorToStateConnectorNode<>(
             name,
             new ProcessorParameters<>(processorSupplier, name),
             stateStoreNames);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index 989984d42f8..0c0f557b5c9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -110,10 +110,12 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
 
         final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
         final StoreFactory storeFactory = new 
SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy);
+        final long gracePeriod = windows.gracePeriodMs() + 
windows.inactivityGap();
 
-        return aggregateBuilder.build(
+        return aggregateBuilder.buildWindowed(
             new NamedInternal(aggregateName),
-            storeFactory,
+            storeFactory.storeName(),
+            gracePeriod,
             new KStreamSessionWindowAggregate<>(
                 windows,
                 storeFactory,
@@ -162,10 +164,12 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
 
         final String reduceName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
         final StoreFactory storeFactory = new 
SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy);
+        final long gracePeriod = windows.gracePeriodMs() + 
windows.inactivityGap();
 
-        return aggregateBuilder.build(
+        return aggregateBuilder.buildWindowed(
             new NamedInternal(reduceName),
-            storeFactory,
+            storeFactory.storeName(),
+            gracePeriod,
             new KStreamSessionWindowAggregate<>(
                 windows,
                 storeFactory,
@@ -222,10 +226,12 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
 
         final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
         final StoreFactory storeFactory = new 
SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy);
+        final long gracePeriod = windows.gracePeriodMs() + 
windows.inactivityGap();
 
-        return aggregateBuilder.build(
+        return aggregateBuilder.buildWindowed(
             new NamedInternal(aggregateName),
-            storeFactory,
+            storeFactory.storeName(),
+            gracePeriod,
             new KStreamSessionWindowAggregate<>(
                 windows,
                 storeFactory,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
index 16b2d0185ae..c2af4652f8f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
@@ -93,9 +93,10 @@ public class SlidingWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
         final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
         final StoreFactory storeFactory = new 
SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
 
-        return aggregateBuilder.build(
+        return aggregateBuilder.buildWindowed(
                 new NamedInternal(aggregateName),
-                storeFactory,
+                storeFactory.storeName(),
+                windows.gracePeriodMs(),
                 new KStreamSlidingWindowAggregate<>(windows, storeFactory, 
emitStrategy, aggregateBuilder.countInitializer, 
aggregateBuilder.countAggregator),
                 materializedInternal.queryableStoreName(),
                 materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), 
windows.timeDifferenceMs()) : null,
@@ -139,9 +140,10 @@ public class SlidingWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
         final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
         final StoreFactory storeFactory = new 
SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
 
-        return aggregateBuilder.build(
+        return aggregateBuilder.buildWindowed(
                 new NamedInternal(aggregateName),
-                storeFactory,
+                storeFactory.storeName(),
+                windows.gracePeriodMs(),
                 new KStreamSlidingWindowAggregate<>(windows, storeFactory, 
emitStrategy, initializer, aggregator),
                 materializedInternal.queryableStoreName(),
                 materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), 
windows.timeDifferenceMs()) : null,
@@ -186,9 +188,10 @@ public class SlidingWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
         final String reduceName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
         final StoreFactory storeFactory = new 
SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
 
-        return aggregateBuilder.build(
+        return aggregateBuilder.buildWindowed(
                 new NamedInternal(reduceName),
-                storeFactory,
+                storeFactory.storeName(),
+                windows.gracePeriodMs(),
                 new KStreamSlidingWindowAggregate<>(windows, storeFactory, 
emitStrategy, aggregateBuilder.reduceInitializer, 
aggregatorForReducer(reducer)),
                 materializedInternal.queryableStoreName(),
                 materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), 
windows.timeDifferenceMs()) : null,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 80a671abdc5..5240f6f0ef0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -105,9 +105,10 @@ public class TimeWindowedKStreamImpl<K, V, W extends 
Window> extends AbstractStr
         final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
         final StoreFactory storeFactory = new 
WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
 
-        return aggregateBuilder.build(
+        return aggregateBuilder.buildWindowed(
             new NamedInternal(aggregateName),
-            storeFactory,
+            storeFactory.storeName(),
+            windows.gracePeriodMs(),
             new KStreamWindowAggregate<>(
                 windows,
                 storeFactory,
@@ -158,9 +159,10 @@ public class TimeWindowedKStreamImpl<K, V, W extends 
Window> extends AbstractStr
         final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
         final StoreFactory storeFactory = new 
WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
 
-        return aggregateBuilder.build(
+        return aggregateBuilder.buildWindowed(
             new NamedInternal(aggregateName),
-            storeFactory,
+            storeFactory.storeName(),
+            windows.gracePeriodMs(),
             new KStreamWindowAggregate<>(
                 windows,
                 storeFactory,
@@ -210,9 +212,10 @@ public class TimeWindowedKStreamImpl<K, V, W extends 
Window> extends AbstractStr
         final String reduceName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
         final StoreFactory storeFactory = new 
WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
 
-        return aggregateBuilder.build(
+        return aggregateBuilder.buildWindowed(
             new NamedInternal(reduceName),
-            storeFactory,
+            storeFactory.storeName(),
+            windows.gracePeriodMs(),
             new KStreamWindowAggregate<>(
                 windows,
                 storeFactory,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GracePeriodGraphNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GracePeriodGraphNode.java
new file mode 100644
index 00000000000..c6ed537fd0b
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GracePeriodGraphNode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.graph;
+
+/**
+ * Represents a stateful {@link ProcessorGraphNode} where a semantic grace 
period is defined for the processor
+ * and its state.
+ */
+public class GracePeriodGraphNode<K, V> extends ProcessorGraphNode<K, V> {
+
+    private final long gracePeriod;
+
+    public GracePeriodGraphNode(final String nodeName,
+                                final ProcessorParameters<K, V, ?, ?> 
processorParameters,
+                                final long gracePeriod) {
+        super(nodeName, processorParameters);
+        this.gracePeriod = gracePeriod;
+    }
+
+    public long gracePeriod() {
+        return gracePeriod;
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
index d58b3a3b1a9..09ed36284a8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
@@ -17,13 +17,6 @@
 package org.apache.kafka.streams.kstream.internals.graph;
 
 import org.apache.kafka.streams.errors.TopologyException;
-import org.apache.kafka.streams.kstream.SessionWindows;
-import org.apache.kafka.streams.kstream.SlidingWindows;
-import org.apache.kafka.streams.kstream.Windows;
-import 
org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
-import 
org.apache.kafka.streams.kstream.internals.KStreamSlidingWindowAggregate;
-import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
-import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 
 public final class GraphGraceSearchUtil {
     private GraphGraceSearchUtil() {}
@@ -32,6 +25,7 @@ public final class GraphGraceSearchUtil {
         return findAndVerifyWindowGrace(graphNode, "");
     }
 
+    @SuppressWarnings("rawtypes")
     private static long findAndVerifyWindowGrace(final GraphNode graphNode, 
final String chain) {
         // error base case: we traversed off the end of the graph without 
finding a window definition
         if (graphNode == null) {
@@ -40,11 +34,8 @@ public final class GraphGraceSearchUtil {
             );
         }
         // base case: return if this node defines a grace period.
-        {
-            final Long gracePeriod = extractGracePeriod(graphNode);
-            if (gracePeriod != null) {
-                return gracePeriod;
-            }
+        if (graphNode instanceof GracePeriodGraphNode) {
+            return ((GracePeriodGraphNode) graphNode).gracePeriod();
         }
 
         final String newChain = chain.equals("") ? graphNode.nodeName() : 
graphNode.nodeName() + "->" + chain;
@@ -70,27 +61,4 @@ public final class GraphGraceSearchUtil {
         return inheritedGrace;
     }
 
-    @SuppressWarnings("rawtypes")
-    private static Long extractGracePeriod(final GraphNode node) {
-        if (node instanceof ProcessorGraphNode) {
-            final ProcessorSupplier processorSupplier = ((ProcessorGraphNode) 
node).processorParameters().processorSupplier();
-            if (processorSupplier instanceof KStreamWindowAggregate) {
-                final KStreamWindowAggregate kStreamWindowAggregate = 
(KStreamWindowAggregate) processorSupplier;
-                final Windows windows = kStreamWindowAggregate.windows();
-                return windows.gracePeriodMs();
-            } else if (processorSupplier instanceof 
KStreamSessionWindowAggregate) {
-                final KStreamSessionWindowAggregate 
kStreamSessionWindowAggregate = (KStreamSessionWindowAggregate) 
processorSupplier;
-                final SessionWindows windows = 
kStreamSessionWindowAggregate.windows();
-                return windows.gracePeriodMs() + windows.inactivityGap();
-            } else if (processorSupplier instanceof 
KStreamSlidingWindowAggregate) {
-                final KStreamSlidingWindowAggregate 
kStreamSlidingWindowAggregate = (KStreamSlidingWindowAggregate) 
processorSupplier;
-                final SlidingWindows windows = 
kStreamSlidingWindowAggregate.windows();
-                return windows.gracePeriodMs();
-            } else {
-                return null;
-            }
-        } else {
-            return null;
-        }
-    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index 4c37fa2f5cf..5c52187bab2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -811,7 +811,7 @@ public class SuppressScenarioTest {
     }
 
     @Test
-    public void shouldWorkWithCogrouped() {
+    public void shouldWorkWithCogroupedTimeWindows() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final KGroupedStream<String, String> stream1 = builder.stream("one", 
Consumed.with(Serdes.String(), 
Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
@@ -823,6 +823,32 @@ public class SuppressScenarioTest {
             .toStream();
     }
 
+    @Test
+    public void shouldWorkWithCogroupedSlidingWindows() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KGroupedStream<String, String> stream1 = builder.stream("one", 
Consumed.with(Serdes.String(), 
Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
+        final KGroupedStream<String, String> stream2 = builder.stream("two", 
Consumed.with(Serdes.String(), 
Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
+        final KStream<Windowed<String>, Object> cogrouped = 
stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, 
(key, value, aggregate) -> aggregate + value)
+            
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(15)))
+            .aggregate(() -> "", Named.as("test"), Materialized.as("store"))
+            .suppress(Suppressed.untilWindowCloses(unbounded()))
+            .toStream();
+    }
+
+    @Test
+    public void shouldWorkWithCogroupedSessionWindows() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KGroupedStream<String, String> stream1 = builder.stream("one", 
Consumed.with(Serdes.String(), 
Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
+        final KGroupedStream<String, String> stream2 = builder.stream("two", 
Consumed.with(Serdes.String(), 
Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
+        final KStream<Windowed<String>, Object> cogrouped = 
stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, 
(key, value, aggregate) -> aggregate + value)
+            
.windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMinutes(15), 
Duration.ofMinutes(5)))
+            .aggregate(() -> "", (k, v1, v2) -> "", Named.as("test"), 
Materialized.as("store"))
+            .suppress(Suppressed.untilWindowCloses(unbounded()))
+            .toStream();
+    }
+
     private static <K, V> void verify(final List<TestRecord<K, V>> results,
                                       final List<KeyValueTimestamp<K, V>> 
expectedResults) {
         if (results.size() != expectedResults.size()) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 5db1439835c..6ed7dea0fb3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -87,7 +87,7 @@ public class GraphGraceSearchUtilTest {
     @Test
     public void shouldExtractGraceFromKStreamWindowAggregateNode() {
         final TimeWindows windows = TimeWindows.ofSizeAndGrace(ofMillis(10L), 
ofMillis(1234L));
-        final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>(
+        final ProcessorGraphNode<String, Long> node = new 
GracePeriodGraphNode<>(
             "asdf",
             new ProcessorParameters<>(
                 new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
@@ -98,18 +98,19 @@ public class GraphGraceSearchUtilTest {
                     null
                 ),
                 "asdf"
-            )
+            ),
+            windows.gracePeriodMs()
         );
 
         final long extracted = 
GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
-        assertThat(extracted, is(windows.gracePeriodMs()));
+        assertThat(extracted, is(1234L));
     }
 
     @Test
     public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() {
         final SessionWindows windows = 
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L));
 
-        final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>(
+        final ProcessorGraphNode<String, Long> node = new 
GracePeriodGraphNode<>(
             "asdf",
             new ProcessorParameters<>(
                 new KStreamSessionWindowAggregate<String, Long, Integer>(
@@ -121,21 +122,23 @@ public class GraphGraceSearchUtilTest {
                     null
                 ),
                 "asdf"
-            )
+            ),
+            windows.gracePeriodMs() + windows.inactivityGap()
         );
 
         final long extracted = 
GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
-        assertThat(extracted, is(windows.gracePeriodMs() + 
windows.inactivityGap()));
+        assertThat(extracted, is(1244L));
     }
 
     @Test
     public void shouldExtractGraceFromSessionAncestorThroughStatefulParent() {
         final SessionWindows windows = 
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L));
-        final ProcessorGraphNode<String, Long> graceGrandparent = new 
ProcessorGraphNode<>(
+        final ProcessorGraphNode<String, Long> graceGrandparent = new 
GracePeriodGraphNode<>(
             "asdf",
             new ProcessorParameters<>(new 
KStreamSessionWindowAggregate<String, Long, Integer>(
                 windows, mockStoreFactory("asdf"), 
EmitStrategy.onWindowUpdate(), null, null, null
-            ), "asdf")
+            ), "asdf"),
+            windows.gracePeriodMs() + windows.inactivityGap()
         );
 
         final ProcessorGraphNode<String, Long> statefulParent = new 
ProcessorGraphNode<>(
@@ -167,13 +170,13 @@ public class GraphGraceSearchUtilTest {
         statefulParent.addChild(node);
 
         final long extracted = 
GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
-        assertThat(extracted, is(windows.gracePeriodMs() + 
windows.inactivityGap()));
+        assertThat(extracted, is(1244L));
     }
 
     @Test
     public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() {
         final SessionWindows windows = 
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L));
-        final ProcessorGraphNode<String, Long> graceGrandparent = new 
ProcessorGraphNode<>(
+        final ProcessorGraphNode<String, Long> graceGrandparent = new 
GracePeriodGraphNode<>(
             "asdf",
             new ProcessorParameters<>(
                 new KStreamSessionWindowAggregate<String, Long, Integer>(
@@ -185,7 +188,8 @@ public class GraphGraceSearchUtilTest {
                     null
                 ),
                 "asdf"
-            )
+            ),
+            windows.gracePeriodMs() + windows.inactivityGap()
         );
 
         final ProcessorGraphNode<String, Long> statelessParent = new 
ProcessorGraphNode<>(
@@ -217,16 +221,17 @@ public class GraphGraceSearchUtilTest {
         statelessParent.addChild(node);
 
         final long extracted = 
GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
-        assertThat(extracted, is(windows.gracePeriodMs() + 
windows.inactivityGap()));
+        assertThat(extracted, is(1244L));
     }
 
     @Test
     public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() {
-        final ProcessorGraphNode<String, Long> leftParent = new 
ProcessorGraphNode<>(
+        final SessionWindows leftWindows = 
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L));
+        final ProcessorGraphNode<String, Long> leftParent = new 
GracePeriodGraphNode<>(
             "asdf",
             new ProcessorParameters<>(
                 new KStreamSessionWindowAggregate<String, Long, Integer>(
-                    SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), 
ofMillis(1234L)),
+                    leftWindows,
                     mockStoreFactory("asdf"),
                     EmitStrategy.onWindowUpdate(),
                     null,
@@ -234,21 +239,24 @@ public class GraphGraceSearchUtilTest {
                     null
                 ),
                 "asdf"
-            )
+            ),
+            leftWindows.gracePeriodMs() + leftWindows.inactivityGap()
         );
 
-        final ProcessorGraphNode<String, Long> rightParent = new 
ProcessorGraphNode<>(
+        final TimeWindows rightWindows = 
TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(4321L));
+        final ProcessorGraphNode<String, Long> rightParent = new 
GracePeriodGraphNode<>(
             "asdf",
             new ProcessorParameters<>(
                 new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
-                    TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(4321L)),
+                    rightWindows,
                     mockStoreFactory("asdf"),
                     EmitStrategy.onWindowUpdate(),
                     null,
                     null
                 ),
                 "asdf"
-            )
+            ),
+            rightWindows.gracePeriodMs()
         );
 
         final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>(


Reply via email to