This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ed10fc63a9f KAFKA-18026: supply stores for KTable#mapValues using 
ProcessorSupplier#stores (#18155)
ed10fc63a9f is described below

commit ed10fc63a9fd3032a529f7e10a0f863ee71b710a
Author: Rohan <[email protected]>
AuthorDate: Sat Dec 14 20:18:49 2024 -0800

    KAFKA-18026: supply stores for KTable#mapValues using 
ProcessorSupplier#stores (#18155)
    
    KAFKA-18026: supply stores for KTable#mapValues using 
ProcessorSupplier#stores
    
    Reviewers: Guozhang Wang <[email protected]>, Anna Sophie 
Blee-Goldman <[email protected]>
---
 .../streams/kstream/internals/KTableImpl.java      | 11 ++---
 .../streams/kstream/internals/KTableMapValues.java | 18 ++++++-
 .../kstream/internals/graph/TableFilterNode.java   |  2 +-
 .../internals/graph/TableProcessorNode.java        | 21 +++-----
 .../apache/kafka/streams/StreamsBuilderTest.java   | 57 +++++++++++++++++++---
 5 files changed, 79 insertions(+), 30 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index f90d35827f1..e661d78fefc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -322,7 +322,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
 
         final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, MAPVALUES_NAME);
 
-        final KTableProcessorSupplier<K, V, K, VR> processorSupplier = new 
KTableMapValues<>(this, mapper, queryableStoreName);
+        final KTableProcessorSupplier<K, V, K, VR> processorSupplier = new 
KTableMapValues<>(this, mapper, queryableStoreName, storeFactory);
 
         // leaving in calls to ITB until building topology with graph
 
@@ -331,8 +331,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
         );
         final GraphNode tableNode = new TableProcessorNode<>(
             name,
-            processorParameters,
-            storeFactory
+            processorParameters
         );
         maybeSetOutputVersioned(tableNode, materializedInternal);
 
@@ -1358,16 +1357,12 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
 
         final KTableSource<K, VR> resultProcessorSupplier = new 
KTableSource<>(materializedInternal);
 
-        final StoreFactory resultStore =
-            new KeyValueStoreMaterializer<>(materializedInternal);
-
         final TableProcessorNode<K, VR> resultNode = new TableProcessorNode<>(
             resultProcessorName,
             new ProcessorParameters<>(
                 resultProcessorSupplier,
                 resultProcessorName
-            ),
-            resultStore
+            )
         );
         resultNode.setOutputVersioned(materializedInternal.storeSupplier() 
instanceof VersionedBytesStoreSupplier);
         builder.addGraphNode(responseJoinNode, resultNode);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index c26488c12a1..af495c9b9a3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -20,9 +20,14 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 
+import java.util.Collections;
+import java.util.Set;
+
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 import static 
org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
 import static 
org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST;
@@ -33,13 +38,16 @@ class KTableMapValues<KIn, VIn, VOut> implements 
KTableProcessorSupplier<KIn, VI
     private final ValueMapperWithKey<? super KIn, ? super VIn, ? extends VOut> 
mapper;
     private final String queryableName;
     private boolean sendOldValues = false;
+    private final StoreFactory storeFactory;
 
     KTableMapValues(final KTableImpl<KIn, ?, VIn> parent,
                     final ValueMapperWithKey<? super KIn, ? super VIn, ? 
extends VOut> mapper,
-                    final String queryableName) {
+                    final String queryableName,
+                    final StoreFactory storeFactory) {
         this.parent = parent;
         this.mapper = mapper;
         this.queryableName = queryableName;
+        this.storeFactory = storeFactory;
     }
 
     @Override
@@ -47,6 +55,14 @@ class KTableMapValues<KIn, VIn, VOut> implements 
KTableProcessorSupplier<KIn, VI
         return new KTableMapValuesProcessor();
     }
 
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        if (storeFactory == null) {
+            return null;
+        }
+        return Collections.singleton(new 
StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory));
+    }
+
     @Override
     public KTableValueGetterSupplier<KIn, VOut> view() {
         // if the KTable is materialized, use the materialized store to return 
getter value;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
index 1874bd807ed..38033693ebb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
@@ -26,7 +26,7 @@ public class TableFilterNode<K, V> extends 
TableProcessorNode<K, V> implements V
     public TableFilterNode(final String nodeName,
                            final ProcessorParameters<K, V, ?, ?> 
processorParameters,
                            final StoreFactory storeFactory) {
-        super(nodeName, processorParameters, storeFactory);
+        super(nodeName, processorParameters, storeFactory, null);
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index b47252068e6..af3ab15d490 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -17,12 +17,10 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
-import org.apache.kafka.streams.kstream.internals.KTableSource;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 
 import java.util.Arrays;
-import java.util.Objects;
 
 public class TableProcessorNode<K, V> extends GraphNode {
 
@@ -31,9 +29,8 @@ public class TableProcessorNode<K, V> extends GraphNode {
     private final String[] storeNames;
 
     public TableProcessorNode(final String nodeName,
-                              final ProcessorParameters<K, V, ?, ?> 
processorParameters,
-                              final StoreFactory storeFactory) {
-        this(nodeName, processorParameters, storeFactory, null);
+                              final ProcessorParameters<K, V, ?, ?> 
processorParameters) {
+        this(nodeName, processorParameters, null, null);
     }
 
     public TableProcessorNode(final String nodeName,
@@ -62,21 +59,17 @@ public class TableProcessorNode<K, V> extends GraphNode {
     @SuppressWarnings("unchecked")
     @Override
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+        processorParameters.addProcessorTo(topologyBuilder, parentNodeNames());
+
         final String processorName = processorParameters.processorName();
-        topologyBuilder.addProcessor(processorName, 
processorParameters.processorSupplier(), parentNodeNames());
 
         if (storeNames.length > 0) {
+            // todo(rodesai): remove me once all operators have been moved to 
ProcessorSupplier
             topologyBuilder.connectProcessorAndStateStores(processorName, 
storeNames);
         }
 
-        final KTableSource<K, V> tableSource =  
processorParameters.processorSupplier() instanceof KTableSource ?
-                (KTableSource<K, V>) processorParameters.processorSupplier() : 
null;
-        if (tableSource != null) {
-            if (tableSource.materialized()) {
-                
topologyBuilder.addStateStore(Objects.requireNonNull(storeFactory, 
"storeFactory was null"),
-                                              processorName);
-            }
-        } else if (storeFactory != null) {
+        if (storeFactory != null) {
+            // todo(rodesai) remove when KTableImpl#doFilter, 
KTableImpl#doTransformValues moved to ProcessorSupplier
             topologyBuilder.addStateStore(storeFactory, processorName);
         }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 738f753f532..be38f051492 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -1621,6 +1621,26 @@ public class StreamsBuilderTest {
         assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
     }
 
+    @Test
+    public void shouldWrapProcessorsForMapValuesWithMaterializedStore() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        builder.table("input", Consumed.as("source-table"))
+            .mapValues(v -> null, Named.as("map-values"), 
Materialized.as("map-values-store"))
+            .toStream(Named.as("to-stream"))
+            .to("output-topic", Produced.as("sink"));
+        builder.build();
+
+        assertThat(counter.wrappedProcessorNames(),
+            Matchers.containsInAnyOrder("source-table", "map-values", 
"to-stream"));
+        assertThat(counter.numUniqueStateStores(), is(1));
+        assertThat(counter.numConnectedStateStores(), is(1));
+    }
+
     @Test
     public void shouldWrapProcessorsForTableAggregate() {
         final Map<Object, Object> props = dummyStreamsConfigMap();
@@ -1687,21 +1707,46 @@ public class StreamsBuilderTest {
             .selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3
             .peek((k, v) -> { }, Named.as("peek")) // wrapped 4
             .flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) // 
wrapped 5
-            .toTable(Named.as("toTable")) // wrapped 6
-            .filter((k, v) -> true, Named.as("filter-table")) // should be 
wrapped once we do TableProcessorNode
-            .toStream(Named.as("toStream")) // wrapped 7
             .to("output", Produced.as("sink"));
 
         builder.build();
-        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(8));
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5));
         assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
-            "filter-stream", "map", "selectKey", "peek", "flatMap",
-            "toTable-repartition-filter", "toStream", "toTable"
+            "filter-stream", "map", "selectKey", "peek", "flatMap"
         ));
         assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0));
         assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0));
     }
 
+    @Test
+    public void shouldWrapProcessorsWhenMultipleTableOperators() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        builder.stream("input", Consumed.as("source"))
+            .toTable(Named.as("to-table"))
+            .mapValues(v -> v, Named.as("map-values"))
+            .mapValues(v -> v, Named.as("map-values-stateful"), 
Materialized.as("map-values-stateful"))
+            .filter((k, v) -> true, Named.as("filter-table"))
+            .filter((k, v) -> true, Named.as("filter-table-stateful"), 
Materialized.as("filter-table-stateful"))
+            .toStream(Named.as("to-stream"))
+            .to("output", Produced.as("sink"));
+
+        builder.build();
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(6));
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "to-table", "map-values", "map-values-stateful",
+            "filter-table", "filter-table-stateful", "to-stream"
+        ));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
+    }
+
     @Test
     public void shouldWrapProcessorsForUnmaterializedSourceTable() {
         final Map<Object, Object> props = dummyStreamsConfigMap();

Reply via email to