This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 09e8fa2dbe0 KAFKA-18026: KIP-1112, migrate stream-table joins to use 
ProcesserSupplier#stores (#18047)
09e8fa2dbe0 is described below

commit 09e8fa2dbe0517fcb08d756f67dfd4061f275a71
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Dec 5 10:06:11 2024 -0800

    KAFKA-18026: KIP-1112, migrate stream-table joins to use 
ProcesserSupplier#stores (#18047)
    
    Covers wrapping of processors and state stores for KStream-KTable joins
    
    Reviewers: Almog Gavra <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../streams/kstream/internals/KStreamImpl.java     | 18 +++----
 .../kstream/internals/KStreamKTableJoin.java       | 21 ++++++--
 .../internals/graph/StreamTableJoinNode.java       | 12 +----
 .../apache/kafka/streams/StreamsBuilderTest.java   | 60 ++++++++++++++++++++++
 4 files changed, 88 insertions(+), 23 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index b650724055b..820c31f29e4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -61,8 +61,8 @@ import 
org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
 import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBuffer;
 
@@ -1127,7 +1127,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
             leftJoin);
         final ProcessorParameters<K, V, ?, ?> processorParameters = new 
ProcessorParameters<>(processorSupplier, name);
         final StreamTableJoinNode<K, V> streamTableJoinNode =
-            new StreamTableJoinNode<>(name, processorParameters, new String[] 
{}, null, null, Optional.empty());
+            new StreamTableJoinNode<>(name, processorParameters, new String[] 
{}, null, null);
 
         if (leftJoin) {
             
streamTableJoinNode.labels().add(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
@@ -1159,16 +1159,14 @@ public class KStreamImpl<K, V> extends 
AbstractStream<K, V> implements KStream<K
 
         final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin 
? LEFTJOIN_NAME : JOIN_NAME);
 
-        Optional<String> bufferStoreName = Optional.empty();
+        Optional<StoreBuilder<?>> bufferStoreBuilder = Optional.empty();
 
         if (joinedInternal.gracePeriod() != null) {
             if (!((KTableImpl<K, ?, VO>) 
table).graphNode.isOutputVersioned().orElse(true)) {
                 throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
             }
-            bufferStoreName = Optional.of(name + "-Buffer");
-            final RocksDBTimeOrderedKeyValueBuffer.Builder<Object, Object> 
storeBuilder =
-                    new 
RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), 
joinedInternal.gracePeriod(), name);
-            
builder.addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder));
+            final String bufferName = name + "-Buffer";
+            bufferStoreBuilder = Optional.of(new 
RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferName, 
joinedInternal.gracePeriod(), name));
         }
 
         final ProcessorSupplier<K, V, K, ? extends VR> processorSupplier = new 
KStreamKTableJoin<>(
@@ -1176,7 +1174,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
             joiner,
             leftJoin,
             Optional.ofNullable(joinedInternal.gracePeriod()),
-            bufferStoreName);
+            bufferStoreBuilder
+        );
 
         final ProcessorParameters<K, V, ?, ?> processorParameters = new 
ProcessorParameters<>(processorSupplier, name);
         final StreamTableJoinNode<K, V> streamTableJoinNode = new 
StreamTableJoinNode<>(
@@ -1184,8 +1183,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
             processorParameters,
             ((KTableImpl<K, ?, VO>) table).valueGetterSupplier().storeNames(),
             this.name,
-            joinedInternal.gracePeriod(),
-            bufferStoreName
+            joinedInternal.gracePeriod()
         );
 
         builder.addGraphNode(graphNode, streamTableJoinNode);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
index d1ea36a470c..8fc775e2b9b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
@@ -20,9 +20,13 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.time.Duration;
 import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Collections.singleton;
 
 class KStreamKTableJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, 
K, VOut> {
 
@@ -32,18 +36,29 @@ class KStreamKTableJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
     private final boolean leftJoin;
     private final Optional<Duration> gracePeriod;
     private final Optional<String> storeName;
-
+    private final Set<StoreBuilder<?>> stores;
 
     KStreamKTableJoin(final KTableValueGetterSupplier<K, V2> 
valueGetterSupplier,
                       final ValueJoinerWithKey<? super K, ? super V1, ? super 
V2, VOut> joiner,
                       final boolean leftJoin,
                       final Optional<Duration> gracePeriod,
-                      final Optional<String> storeName) {
+                      final Optional<StoreBuilder<?>> bufferStoreBuilder) {
         this.valueGetterSupplier = valueGetterSupplier;
         this.joiner = joiner;
         this.leftJoin = leftJoin;
         this.gracePeriod = gracePeriod;
-        this.storeName = storeName;
+        this.storeName = bufferStoreBuilder.map(StoreBuilder::name);
+
+        if (bufferStoreBuilder.isEmpty()) {
+            this.stores = null;
+        } else {
+            this.stores = singleton(bufferStoreBuilder.get());
+        }
+    }
+
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        return stores;
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
index ad6083cbbcf..b73a8caaa55 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
@@ -17,12 +17,10 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
-import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.Optional;
 
 /**
  * Represents a join between a KStream and a KTable or GlobalKTable
@@ -34,15 +32,12 @@ public class StreamTableJoinNode<K, V> extends GraphNode {
     private final ProcessorParameters<K, V, ?, ?> processorParameters;
     private final String otherJoinSideNodeName;
     private final Duration gracePeriod;
-    private final Optional<String> bufferName;
-
 
     public StreamTableJoinNode(final String nodeName,
                                final ProcessorParameters<K, V, ?, ?> 
processorParameters,
                                final String[] storeNames,
                                final String otherJoinSideNodeName,
-                               final Duration gracePeriod,
-                               final Optional<String> bufferName) {
+                               final Duration gracePeriod) {
         super(nodeName);
 
         // in the case of Stream-Table join the state stores associated with 
the KTable
@@ -50,7 +45,6 @@ public class StreamTableJoinNode<K, V> extends GraphNode {
         this.processorParameters = processorParameters;
         this.otherJoinSideNodeName = otherJoinSideNodeName;
         this.gracePeriod = gracePeriod;
-        this.bufferName = bufferName;
     }
 
     @Override
@@ -65,15 +59,13 @@ public class StreamTableJoinNode<K, V> extends GraphNode {
     @Override
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
         final String processorName = processorParameters.processorName();
-        final ProcessorSupplier<K, V, ?, ?> processorSupplier = 
processorParameters.processorSupplier();
 
         // Stream - Table join (Global or KTable)
-        topologyBuilder.addProcessor(processorName, processorSupplier, 
parentNodeNames());
+        processorParameters.addProcessorTo(topologyBuilder, parentNodeNames());
 
         // Steam - KTable join only
         if (otherJoinSideNodeName != null) {
             topologyBuilder.connectProcessorAndStateStores(processorName, 
storeNames);
-            bufferName.ifPresent(s -> 
topologyBuilder.connectProcessorAndStateStores(processorName, s));
             if (gracePeriod != null) {
                 for (final String storeName : storeNames) {
                     if (!topologyBuilder.isStoreVersioned(storeName)) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 36559b18e29..0c9031afbdf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -1756,6 +1756,66 @@ public class StreamsBuilderTest {
         assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
     }
 
+    @Test
+    public void shouldWrapProcessorsForStreamTableJoin() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        final KStream<String, String> stream = builder.stream("input", 
Consumed.as("source-stream"));
+        final KTable<String, String> table = builder.table("input-table", 
Consumed.as("source-table"));
+
+        stream.join(
+                table,
+                MockValueJoiner.TOSTRING_JOINER,
+                Joined.as("st-join"))
+            .to("output", Produced.as("sink"));
+
+        builder.build();
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "source-table", "st-join"
+        ));
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
+    }
+
+    @Test
+    public void shouldWrapProcessorsForStreamTableJoinWithGracePeriod() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        final KStream<String, String> stream = builder.stream("input", 
Consumed.as("source-stream"));
+        final KTable<String, String> table = builder.table(
+            "input-table",
+            Consumed.as("versioned-source-table"),
+            
Materialized.as(Stores.persistentVersionedKeyValueStore("table-store", 
Duration.ofDays(1)))
+        );
+
+        stream.join(
+            table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.<String, String, 
String>as("st-join").withGracePeriod(Duration.ofDays(1)))
+            .to("output", Produced.as("sink"));
+
+        builder.build();
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "versioned-source-table", "st-join"
+        ));
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
+    }
+
     @Test
     public void shouldAllowStreamsFromSameTopic() {
         builder.stream("topic");

Reply via email to