This is an automated email from the ASF dual-hosted git repository. ableegoldman pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c7d3999d4f5 KAFKA-18026: KIP-1112, clean up graph node grace period resolution (#18342) c7d3999d4f5 is described below commit c7d3999d4f5f6cbe7bb2be60155ee6e607a7152d Author: A. Sophie Blee-Goldman <ableegold...@gmail.com> AuthorDate: Wed Jan 15 18:01:19 2025 -0800 KAFKA-18026: KIP-1112, clean up graph node grace period resolution (#18342) Minor followup to #18195 that I split out into a separate PR since that one was getting a bit long. Should be rebased & reviewed after that one is merged. Introduces a new class for windowed graph nodes with a grace period defined to improve (slightly) the type safety Reviewers: Guozhang Wang <guozhang.wang...@gmail.com>, Almog Gavra <al...@responsive.dev> --- .../internals/CogroupedStreamAggregateBuilder.java | 23 ++++--- .../internals/GroupedStreamAggregateBuilder.java | 76 ++++++++++++++++------ .../kstream/internals/KGroupedStreamImpl.java | 4 +- .../streams/kstream/internals/KStreamImpl.java | 3 +- .../internals/SessionWindowedKStreamImpl.java | 18 +++-- .../internals/SlidingWindowedKStreamImpl.java | 15 +++-- .../kstream/internals/TimeWindowedKStreamImpl.java | 15 +++-- .../internals/graph/GracePeriodGraphNode.java | 38 +++++++++++ .../internals/graph/GraphGraceSearchUtil.java | 38 +---------- .../kstream/internals/SuppressScenarioTest.java | 28 +++++++- .../internals/graph/GraphGraceSearchUtilTest.java | 44 ++++++++----- 11 files changed, 197 insertions(+), 105 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java index 1ce1e7451a7..a450f8ead1a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.SlidingWindows; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windows; +import org.apache.kafka.streams.kstream.internals.graph.GracePeriodGraphNode; import org.apache.kafka.streams.kstream.internals.graph.GraphNode; import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder; import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; @@ -110,10 +111,11 @@ class CogroupedStreamAggregateBuilder<K, VOut> { "-cogroup-agg-" + counter++, builder, CogroupedKStreamImpl.AGGREGATE_NAME); - final ProcessorGraphNode<K, ?> aggProcessorNode = - new ProcessorGraphNode<>( + final GracePeriodGraphNode<K, ?> aggProcessorNode = + new GracePeriodGraphNode<>( kStreamAggProcessorName, - new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName) + new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName), + windows.gracePeriodMs() ); processors.add(aggProcessorNode); builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode); @@ -149,10 +151,12 @@ class CogroupedStreamAggregateBuilder<K, VOut> { "-cogroup-agg-" + counter++, builder, CogroupedKStreamImpl.AGGREGATE_NAME); - final ProcessorGraphNode<K, ?> aggProcessorNode = - new ProcessorGraphNode<>( + final long gracePeriod = sessionWindows.gracePeriodMs() + sessionWindows.inactivityGap(); + final GracePeriodGraphNode<K, ?> aggProcessorNode = + new GracePeriodGraphNode<>( kStreamAggProcessorName, - new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName) + new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName), + gracePeriod ); processors.add(aggProcessorNode); builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode); @@ -187,10 +191,11 @@ class CogroupedStreamAggregateBuilder<K, VOut> { "-cogroup-agg-" + counter++, builder, CogroupedKStreamImpl.AGGREGATE_NAME); - final ProcessorGraphNode<K, ?> aggProcessorNode = - new ProcessorGraphNode<>( + final GracePeriodGraphNode<K, ?> aggProcessorNode = + new GracePeriodGraphNode<>( kStreamAggProcessorName, - new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName) + new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName), + slidingWindows.gracePeriodMs() ); processors.add(aggProcessorNode); builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index bc84e69a1ff..b99034c5306 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -20,10 +20,10 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.internals.graph.GracePeriodGraphNode; import org.apache.kafka.streams.kstream.internals.graph.GraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; -import org.apache.kafka.streams.processor.internals.StoreFactory; import java.util.Collections; import java.util.Set; @@ -66,23 +66,67 @@ class GroupedStreamAggregateBuilder<K, V> { this.userProvidedRepartitionTopicName = groupedInternal.name(); } - <KR, VR> KTable<KR, VR> build(final NamedInternal functionName, - final StoreFactory storeFactory, - final KStreamAggProcessorSupplier<K, V, KR, VR> aggregateSupplier, - final String queryableStoreName, - final Serde<KR> keySerde, - final Serde<VR> valueSerde, - final boolean isOutputVersioned) { - assert queryableStoreName == null || queryableStoreName.equals(storeFactory.storeName()); + <KR, VR> KTable<KR, VR> buildNonWindowed(final NamedInternal functionName, + final String storeName, + final KStreamAggProcessorSupplier<K, V, KR, VR> aggregateSupplier, + final String queryableStoreName, + final Serde<KR> keySerde, + final Serde<VR> valueSerde, + final boolean isOutputVersioned) { + final String aggFunctionName = functionName.name(); + + final ProcessorGraphNode<K, V> aggProcessorNode = + new ProcessorGraphNode<>( + aggFunctionName, + new ProcessorParameters<>(aggregateSupplier, aggFunctionName) + ); + aggProcessorNode.setOutputVersioned(isOutputVersioned); + + return build(aggFunctionName, storeName, aggregateSupplier, aggProcessorNode, queryableStoreName, keySerde, valueSerde); + } + + <KR, VR> KTable<KR, VR> buildWindowed(final NamedInternal functionName, + final String storeName, + final long gracePeriod, + final KStreamAggProcessorSupplier<K, V, KR, VR> aggregateSupplier, + final String queryableStoreName, + final Serde<KR> keySerde, + final Serde<VR> valueSerde, + final boolean isOutputVersioned) { final String aggFunctionName = functionName.name(); + final GracePeriodGraphNode<K, V> gracePeriodAggProcessorNode = + new GracePeriodGraphNode<>( + aggFunctionName, + new ProcessorParameters<>(aggregateSupplier, aggFunctionName), + gracePeriod + ); + + gracePeriodAggProcessorNode.setOutputVersioned(isOutputVersioned); + + return build(aggFunctionName, storeName, aggregateSupplier, gracePeriodAggProcessorNode, queryableStoreName, keySerde, valueSerde); + } + + private <KR, VR> KTable<KR, VR> build(final String aggFunctionName, + final String storeName, + final KStreamAggProcessorSupplier<K, V, KR, VR> aggregateSupplier, + final ProcessorGraphNode<K, V> aggProcessorNode, + final String queryableStoreName, + final Serde<KR> keySerde, + final Serde<VR> valueSerde) { + if (!(queryableStoreName == null || queryableStoreName.equals(storeName))) { + throw new IllegalStateException(String.format("queryableStoreName should be null or equal to storeName" + + " but got storeName='%s' and queryableStoreName='%s'", + storeName, queryableStoreName)); + } + String sourceName = this.name; GraphNode parentNode = graphNode; if (repartitionRequired) { final OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder = optimizableRepartitionNodeBuilder(); - final String repartitionTopicPrefix = userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : storeFactory.storeName(); + final String repartitionTopicPrefix = userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : storeName; sourceName = createRepartitionSource(repartitionTopicPrefix, repartitionNodeBuilder); // First time through we need to create a repartition node. @@ -97,14 +141,7 @@ class GroupedStreamAggregateBuilder<K, V> { parentNode = repartitionNode; } - final ProcessorGraphNode<K, V> statefulProcessorNode = - new ProcessorGraphNode<>( - aggFunctionName, - new ProcessorParameters<>(aggregateSupplier, aggFunctionName) - ); - statefulProcessorNode.setOutputVersioned(isOutputVersioned); - - builder.addGraphNode(parentNode, statefulProcessorNode); + builder.addGraphNode(parentNode, aggProcessorNode); return new KTableImpl<>(aggFunctionName, keySerde, @@ -112,9 +149,8 @@ class GroupedStreamAggregateBuilder<K, V> { sourceName.equals(this.name) ? subTopologySourceNodes : Collections.singleton(sourceName), queryableStoreName, aggregateSupplier, - statefulProcessorNode, + aggProcessorNode, builder); - } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index cc335e1383d..73c6174b27b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -244,9 +244,9 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS final String functionName, final KeyValueStoreMaterializer<K, T> storeFactory) { - return aggregateBuilder.build( + return aggregateBuilder.buildNonWindowed( new NamedInternal(functionName), - storeFactory, + storeFactory.storeName(), aggregateSupplier, storeFactory.queryableStoreName(), storeFactory.keySerde(), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 7beaa1abffb..94e0e9a0e36 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -1231,8 +1231,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } final String name = new NamedInternal(named).name(); - final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new - ProcessorToStateConnectorNode<>( + final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>( name, new ProcessorParameters<>(processorSupplier, name), stateStoreNames); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java index 989984d42f8..0c0f557b5c9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java @@ -110,10 +110,12 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final StoreFactory storeFactory = new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy); + final long gracePeriod = windows.gracePeriodMs() + windows.inactivityGap(); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(aggregateName), - storeFactory, + storeFactory.storeName(), + gracePeriod, new KStreamSessionWindowAggregate<>( windows, storeFactory, @@ -162,10 +164,12 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); final StoreFactory storeFactory = new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy); + final long gracePeriod = windows.gracePeriodMs() + windows.inactivityGap(); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(reduceName), - storeFactory, + storeFactory.storeName(), + gracePeriod, new KStreamSessionWindowAggregate<>( windows, storeFactory, @@ -222,10 +226,12 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final StoreFactory storeFactory = new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy); + final long gracePeriod = windows.gracePeriodMs() + windows.inactivityGap(); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(aggregateName), - storeFactory, + storeFactory.storeName(), + gracePeriod, new KStreamSessionWindowAggregate<>( windows, storeFactory, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java index 16b2d0185ae..c2af4652f8f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java @@ -93,9 +93,10 @@ public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final StoreFactory storeFactory = new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(aggregateName), - storeFactory, + storeFactory.storeName(), + windows.gracePeriodMs(), new KStreamSlidingWindowAggregate<>(windows, storeFactory, emitStrategy, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), materializedInternal.queryableStoreName(), materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null, @@ -139,9 +140,10 @@ public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final StoreFactory storeFactory = new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(aggregateName), - storeFactory, + storeFactory.storeName(), + windows.gracePeriodMs(), new KStreamSlidingWindowAggregate<>(windows, storeFactory, emitStrategy, initializer, aggregator), materializedInternal.queryableStoreName(), materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null, @@ -186,9 +188,10 @@ public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); final StoreFactory storeFactory = new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(reduceName), - storeFactory, + storeFactory.storeName(), + windows.gracePeriodMs(), new KStreamSlidingWindowAggregate<>(windows, storeFactory, emitStrategy, aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)), materializedInternal.queryableStoreName(), materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java index 80a671abdc5..5240f6f0ef0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java @@ -105,9 +105,10 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final StoreFactory storeFactory = new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(aggregateName), - storeFactory, + storeFactory.storeName(), + windows.gracePeriodMs(), new KStreamWindowAggregate<>( windows, storeFactory, @@ -158,9 +159,10 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final StoreFactory storeFactory = new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(aggregateName), - storeFactory, + storeFactory.storeName(), + windows.gracePeriodMs(), new KStreamWindowAggregate<>( windows, storeFactory, @@ -210,9 +212,10 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); final StoreFactory storeFactory = new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(reduceName), - storeFactory, + storeFactory.storeName(), + windows.gracePeriodMs(), new KStreamWindowAggregate<>( windows, storeFactory, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GracePeriodGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GracePeriodGraphNode.java new file mode 100644 index 00000000000..c6ed537fd0b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GracePeriodGraphNode.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals.graph; + +/** + * Represents a stateful {@link ProcessorGraphNode} where a semantic grace period is defined for the processor + * and its state. + */ +public class GracePeriodGraphNode<K, V> extends ProcessorGraphNode<K, V> { + + private final long gracePeriod; + + public GracePeriodGraphNode(final String nodeName, + final ProcessorParameters<K, V, ?, ?> processorParameters, + final long gracePeriod) { + super(nodeName, processorParameters); + this.gracePeriod = gracePeriod; + } + + public long gracePeriod() { + return gracePeriod; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java index d58b3a3b1a9..09ed36284a8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java @@ -17,13 +17,6 @@ package org.apache.kafka.streams.kstream.internals.graph; import org.apache.kafka.streams.errors.TopologyException; -import org.apache.kafka.streams.kstream.SessionWindows; -import org.apache.kafka.streams.kstream.SlidingWindows; -import org.apache.kafka.streams.kstream.Windows; -import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate; -import org.apache.kafka.streams.kstream.internals.KStreamSlidingWindowAggregate; -import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate; -import org.apache.kafka.streams.processor.api.ProcessorSupplier; public final class GraphGraceSearchUtil { private GraphGraceSearchUtil() {} @@ -32,6 +25,7 @@ public final class GraphGraceSearchUtil { return findAndVerifyWindowGrace(graphNode, ""); } + @SuppressWarnings("rawtypes") private static long findAndVerifyWindowGrace(final GraphNode graphNode, final String chain) { // error base case: we traversed off the end of the graph without finding a window definition if (graphNode == null) { @@ -40,11 +34,8 @@ public final class GraphGraceSearchUtil { ); } // base case: return if this node defines a grace period. - { - final Long gracePeriod = extractGracePeriod(graphNode); - if (gracePeriod != null) { - return gracePeriod; - } + if (graphNode instanceof GracePeriodGraphNode) { + return ((GracePeriodGraphNode) graphNode).gracePeriod(); } final String newChain = chain.equals("") ? graphNode.nodeName() : graphNode.nodeName() + "->" + chain; @@ -70,27 +61,4 @@ public final class GraphGraceSearchUtil { return inheritedGrace; } - @SuppressWarnings("rawtypes") - private static Long extractGracePeriod(final GraphNode node) { - if (node instanceof ProcessorGraphNode) { - final ProcessorSupplier processorSupplier = ((ProcessorGraphNode) node).processorParameters().processorSupplier(); - if (processorSupplier instanceof KStreamWindowAggregate) { - final KStreamWindowAggregate kStreamWindowAggregate = (KStreamWindowAggregate) processorSupplier; - final Windows windows = kStreamWindowAggregate.windows(); - return windows.gracePeriodMs(); - } else if (processorSupplier instanceof KStreamSessionWindowAggregate) { - final KStreamSessionWindowAggregate kStreamSessionWindowAggregate = (KStreamSessionWindowAggregate) processorSupplier; - final SessionWindows windows = kStreamSessionWindowAggregate.windows(); - return windows.gracePeriodMs() + windows.inactivityGap(); - } else if (processorSupplier instanceof KStreamSlidingWindowAggregate) { - final KStreamSlidingWindowAggregate kStreamSlidingWindowAggregate = (KStreamSlidingWindowAggregate) processorSupplier; - final SlidingWindows windows = kStreamSlidingWindowAggregate.windows(); - return windows.gracePeriodMs(); - } else { - return null; - } - } else { - return null; - } - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java index 4c37fa2f5cf..5c52187bab2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java @@ -811,7 +811,7 @@ public class SuppressScenarioTest { } @Test - public void shouldWorkWithCogrouped() { + public void shouldWorkWithCogroupedTimeWindows() { final StreamsBuilder builder = new StreamsBuilder(); final KGroupedStream<String, String> stream1 = builder.stream("one", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())); @@ -823,6 +823,32 @@ public class SuppressScenarioTest { .toStream(); } + @Test + public void shouldWorkWithCogroupedSlidingWindows() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KGroupedStream<String, String> stream1 = builder.stream("one", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())); + final KGroupedStream<String, String> stream2 = builder.stream("two", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())); + final KStream<Windowed<String>, Object> cogrouped = stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, (key, value, aggregate) -> aggregate + value) + .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(15))) + .aggregate(() -> "", Named.as("test"), Materialized.as("store")) + .suppress(Suppressed.untilWindowCloses(unbounded())) + .toStream(); + } + + @Test + public void shouldWorkWithCogroupedSessionWindows() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KGroupedStream<String, String> stream1 = builder.stream("one", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())); + final KGroupedStream<String, String> stream2 = builder.stream("two", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())); + final KStream<Windowed<String>, Object> cogrouped = stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, (key, value, aggregate) -> aggregate + value) + .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMinutes(15), Duration.ofMinutes(5))) + .aggregate(() -> "", (k, v1, v2) -> "", Named.as("test"), Materialized.as("store")) + .suppress(Suppressed.untilWindowCloses(unbounded())) + .toStream(); + } + private static <K, V> void verify(final List<TestRecord<K, V>> results, final List<KeyValueTimestamp<K, V>> expectedResults) { if (results.size() != expectedResults.size()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java index 5db1439835c..6ed7dea0fb3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java @@ -87,7 +87,7 @@ public class GraphGraceSearchUtilTest { @Test public void shouldExtractGraceFromKStreamWindowAggregateNode() { final TimeWindows windows = TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(1234L)); - final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>( + final ProcessorGraphNode<String, Long> node = new GracePeriodGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamWindowAggregate<String, Long, Integer, TimeWindow>( @@ -98,18 +98,19 @@ public class GraphGraceSearchUtilTest { null ), "asdf" - ) + ), + windows.gracePeriodMs() ); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); - assertThat(extracted, is(windows.gracePeriodMs())); + assertThat(extracted, is(1234L)); } @Test public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() { final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)); - final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>( + final ProcessorGraphNode<String, Long> node = new GracePeriodGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamSessionWindowAggregate<String, Long, Integer>( @@ -121,21 +122,23 @@ public class GraphGraceSearchUtilTest { null ), "asdf" - ) + ), + windows.gracePeriodMs() + windows.inactivityGap() ); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); - assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap())); + assertThat(extracted, is(1244L)); } @Test public void shouldExtractGraceFromSessionAncestorThroughStatefulParent() { final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)); - final ProcessorGraphNode<String, Long> graceGrandparent = new ProcessorGraphNode<>( + final ProcessorGraphNode<String, Long> graceGrandparent = new GracePeriodGraphNode<>( "asdf", new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>( windows, mockStoreFactory("asdf"), EmitStrategy.onWindowUpdate(), null, null, null - ), "asdf") + ), "asdf"), + windows.gracePeriodMs() + windows.inactivityGap() ); final ProcessorGraphNode<String, Long> statefulParent = new ProcessorGraphNode<>( @@ -167,13 +170,13 @@ public class GraphGraceSearchUtilTest { statefulParent.addChild(node); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); - assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap())); + assertThat(extracted, is(1244L)); } @Test public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() { final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)); - final ProcessorGraphNode<String, Long> graceGrandparent = new ProcessorGraphNode<>( + final ProcessorGraphNode<String, Long> graceGrandparent = new GracePeriodGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamSessionWindowAggregate<String, Long, Integer>( @@ -185,7 +188,8 @@ public class GraphGraceSearchUtilTest { null ), "asdf" - ) + ), + windows.gracePeriodMs() + windows.inactivityGap() ); final ProcessorGraphNode<String, Long> statelessParent = new ProcessorGraphNode<>( @@ -217,16 +221,17 @@ public class GraphGraceSearchUtilTest { statelessParent.addChild(node); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); - assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap())); + assertThat(extracted, is(1244L)); } @Test public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() { - final ProcessorGraphNode<String, Long> leftParent = new ProcessorGraphNode<>( + final SessionWindows leftWindows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)); + final ProcessorGraphNode<String, Long> leftParent = new GracePeriodGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamSessionWindowAggregate<String, Long, Integer>( - SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)), + leftWindows, mockStoreFactory("asdf"), EmitStrategy.onWindowUpdate(), null, @@ -234,21 +239,24 @@ public class GraphGraceSearchUtilTest { null ), "asdf" - ) + ), + leftWindows.gracePeriodMs() + leftWindows.inactivityGap() ); - final ProcessorGraphNode<String, Long> rightParent = new ProcessorGraphNode<>( + final TimeWindows rightWindows = TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(4321L)); + final ProcessorGraphNode<String, Long> rightParent = new GracePeriodGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamWindowAggregate<String, Long, Integer, TimeWindow>( - TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(4321L)), + rightWindows, mockStoreFactory("asdf"), EmitStrategy.onWindowUpdate(), null, null ), "asdf" - ) + ), + rightWindows.gracePeriodMs() ); final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>(