This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9b776ffc50c KAFKA-18026: KIP-1112 convert StreamToTableNode (#18149)
9b776ffc50c is described below
commit 9b776ffc50ca14ce3fb63dbbdfce5860db3a9f56
Author: Almog Gavra <[email protected]>
AuthorDate: Thu Dec 12 14:52:21 2024 -0800
KAFKA-18026: KIP-1112 convert StreamToTableNode (#18149)
Covers wrapping of processors and state stores for StreamToTableSource
Reviewers: Guozhang Wang <[email protected]>, Anna Sophie
Blee-Goldman <[email protected]>
---
.../streams/kstream/internals/KStreamImpl.java | 3 +--
.../kstream/internals/graph/StreamToTableNode.java | 29 +++-------------------
.../apache/kafka/streams/StreamsBuilderTest.java | 6 ++---
3 files changed, 7 insertions(+), 31 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 92cfdb04425..0ba9086e450 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -659,8 +659,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
final ProcessorParameters<K, V, ?, ?> processorParameters = new
ProcessorParameters<>(tableSource, name);
final GraphNode tableNode = new StreamToTableNode<>(
name,
- processorParameters,
- materializedInternal
+ processorParameters
);
tableNode.setOutputVersioned(materializedInternal.storeSupplier()
instanceof VersionedBytesStoreSupplier);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
index a6c825be0c8..08c171e824a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
@@ -17,13 +17,7 @@
package org.apache.kafka.streams.kstream.internals.graph;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.internals.KTableSource;
-import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
-import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.KeyValueStore;
/**
* Represents a KTable convert From KStream
@@ -31,37 +25,20 @@ import org.apache.kafka.streams.state.KeyValueStore;
public class StreamToTableNode<K, V> extends GraphNode {
private final ProcessorParameters<K, V, ?, ?> processorParameters;
- private final MaterializedInternal<K, V, ?> materializedInternal;
public StreamToTableNode(final String nodeName,
- final ProcessorParameters<K, V, ?, ?>
processorParameters,
- final MaterializedInternal<K, V, ?>
materializedInternal) {
+ final ProcessorParameters<K, V, ?, ?>
processorParameters) {
super(nodeName);
this.processorParameters = processorParameters;
- this.materializedInternal = materializedInternal;
}
@Override
public String toString() {
- return "StreamToTableNode{" +
- ", processorParameters=" + processorParameters +
- ", materializedInternal=" + materializedInternal +
- "} " + super.toString();
+ return "StreamToTableNode{" + super.toString() + "}";
}
- @SuppressWarnings("unchecked")
@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
- final StoreFactory storeFactory =
- new KeyValueStoreMaterializer<>((MaterializedInternal<K, V,
KeyValueStore<Bytes, byte[]>>) materializedInternal);
-
- final String processorName = processorParameters.processorName();
- final KTableSource<K, V> tableSource =
processorParameters.processorSupplier() instanceof KTableSource ?
- (KTableSource<K, V>) processorParameters.processorSupplier() :
null;
- topologyBuilder.addProcessor(processorName,
processorParameters.processorSupplier(), parentNodeNames());
-
- if (storeFactory != null && tableSource.materialized()) {
- topologyBuilder.addStateStore(storeFactory, processorName);
- }
+ processorParameters.addProcessorTo(topologyBuilder, parentNodeNames());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 5371d53949f..00c745fd735 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -1686,16 +1686,16 @@ public class StreamsBuilderTest {
.selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3
.peek((k, v) -> { }, Named.as("peek")) // wrapped 4
.flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) //
wrapped 5
- .toTable(Named.as("toTable")) // should be wrapped when we do
StreamToTableNode
+ .toTable(Named.as("toTable")) // wrapped 6
.filter((k, v) -> true, Named.as("filter-table")) // should be
wrapped once we do TableProcessorNode
.toStream(Named.as("toStream")) // wrapped 7
.to("output", Produced.as("sink"));
builder.build();
- assertThat(counter.numWrappedProcessors(), CoreMatchers.is(7));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(8));
assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
"filter-stream", "map", "selectKey", "peek", "flatMap",
- "toTable-repartition-filter", "toStream"
+ "toTable-repartition-filter", "toStream", "toTable"
));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0));