This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9b776ffc50c  KAFKA-18026: KIP-1112 convert StreamToTableNode  (#18149)
9b776ffc50c is described below

commit 9b776ffc50ca14ce3fb63dbbdfce5860db3a9f56
Author: Almog Gavra <[email protected]>
AuthorDate: Thu Dec 12 14:52:21 2024 -0800

     KAFKA-18026: KIP-1112 convert StreamToTableNode  (#18149)
    
    Covers wrapping of processors and state stores for StreamToTableSource
    
    Reviewers: Guozhang Wang <[email protected]>, Anna Sophie 
Blee-Goldman <[email protected]>
---
 .../streams/kstream/internals/KStreamImpl.java     |  3 +--
 .../kstream/internals/graph/StreamToTableNode.java | 29 +++-------------------
 .../apache/kafka/streams/StreamsBuilderTest.java   |  6 ++---
 3 files changed, 7 insertions(+), 31 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 92cfdb04425..0ba9086e450 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -659,8 +659,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
         final ProcessorParameters<K, V, ?, ?> processorParameters = new 
ProcessorParameters<>(tableSource, name);
         final GraphNode tableNode = new StreamToTableNode<>(
             name,
-            processorParameters,
-            materializedInternal
+            processorParameters
         );
         tableNode.setOutputVersioned(materializedInternal.storeSupplier() 
instanceof VersionedBytesStoreSupplier);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
index a6c825be0c8..08c171e824a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
@@ -17,13 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.internals.KTableSource;
-import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
-import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.KeyValueStore;
 
 /**
  * Represents a KTable convert From KStream
@@ -31,37 +25,20 @@ import org.apache.kafka.streams.state.KeyValueStore;
 public class StreamToTableNode<K, V> extends GraphNode {
 
     private final ProcessorParameters<K, V, ?, ?> processorParameters;
-    private final MaterializedInternal<K, V, ?> materializedInternal;
 
     public StreamToTableNode(final String nodeName,
-                             final ProcessorParameters<K, V, ?, ?> 
processorParameters,
-                             final MaterializedInternal<K, V, ?> 
materializedInternal) {
+                             final ProcessorParameters<K, V, ?, ?> 
processorParameters) {
         super(nodeName);
         this.processorParameters = processorParameters;
-        this.materializedInternal = materializedInternal;
     }
 
     @Override
     public String toString() {
-        return "StreamToTableNode{" +
-            ", processorParameters=" + processorParameters +
-            ", materializedInternal=" + materializedInternal +
-            "} " + super.toString();
+        return "StreamToTableNode{" + super.toString() + "}";
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
-        final StoreFactory storeFactory =
-            new KeyValueStoreMaterializer<>((MaterializedInternal<K, V, 
KeyValueStore<Bytes, byte[]>>) materializedInternal);
-
-        final String processorName = processorParameters.processorName();
-        final KTableSource<K, V> tableSource =  
processorParameters.processorSupplier() instanceof KTableSource ?
-                (KTableSource<K, V>) processorParameters.processorSupplier() : 
null;
-        topologyBuilder.addProcessor(processorName, 
processorParameters.processorSupplier(), parentNodeNames());
-
-        if (storeFactory != null && tableSource.materialized()) {
-            topologyBuilder.addStateStore(storeFactory, processorName);
-        }
+        processorParameters.addProcessorTo(topologyBuilder, parentNodeNames());
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 5371d53949f..00c745fd735 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -1686,16 +1686,16 @@ public class StreamsBuilderTest {
             .selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3
             .peek((k, v) -> { }, Named.as("peek")) // wrapped 4
             .flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) // 
wrapped 5
-            .toTable(Named.as("toTable")) // should be wrapped when we do 
StreamToTableNode
+            .toTable(Named.as("toTable")) // wrapped 6
             .filter((k, v) -> true, Named.as("filter-table")) // should be 
wrapped once we do TableProcessorNode
             .toStream(Named.as("toStream")) // wrapped 7
             .to("output", Produced.as("sink"));
 
         builder.build();
-        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(7));
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(8));
         assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
             "filter-stream", "map", "selectKey", "peek", "flatMap",
-            "toTable-repartition-filter", "toStream"
+            "toTable-repartition-filter", "toStream", "toTable"
         ));
         assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0));
         assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0));

Reply via email to