This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ed10fc63a9f KAFKA-18026: supply stores for KTable#mapValues using
ProcessorSupplier#stores (#18155)
ed10fc63a9f is described below
commit ed10fc63a9fd3032a529f7e10a0f863ee71b710a
Author: Rohan <[email protected]>
AuthorDate: Sat Dec 14 20:18:49 2024 -0800
KAFKA-18026: supply stores for KTable#mapValues using
ProcessorSupplier#stores (#18155)
KAFKA-18026: supply stores for KTable#mapValues using
ProcessorSupplier#stores
Reviewers: Guozhang Wang <[email protected]>, Anna Sophie
Blee-Goldman <[email protected]>
---
.../streams/kstream/internals/KTableImpl.java | 11 ++---
.../streams/kstream/internals/KTableMapValues.java | 18 ++++++-
.../kstream/internals/graph/TableFilterNode.java | 2 +-
.../internals/graph/TableProcessorNode.java | 21 +++-----
.../apache/kafka/streams/StreamsBuilderTest.java | 57 +++++++++++++++++++---
5 files changed, 79 insertions(+), 30 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index f90d35827f1..e661d78fefc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -322,7 +322,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
final String name = new
NamedInternal(named).orElseGenerateWithPrefix(builder, MAPVALUES_NAME);
- final KTableProcessorSupplier<K, V, K, VR> processorSupplier = new
KTableMapValues<>(this, mapper, queryableStoreName);
+ final KTableProcessorSupplier<K, V, K, VR> processorSupplier = new
KTableMapValues<>(this, mapper, queryableStoreName, storeFactory);
// leaving in calls to ITB until building topology with graph
@@ -331,8 +331,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
);
final GraphNode tableNode = new TableProcessorNode<>(
name,
- processorParameters,
- storeFactory
+ processorParameters
);
maybeSetOutputVersioned(tableNode, materializedInternal);
@@ -1358,16 +1357,12 @@ public class KTableImpl<K, S, V> extends
AbstractStream<K, V> implements KTable<
final KTableSource<K, VR> resultProcessorSupplier = new
KTableSource<>(materializedInternal);
- final StoreFactory resultStore =
- new KeyValueStoreMaterializer<>(materializedInternal);
-
final TableProcessorNode<K, VR> resultNode = new TableProcessorNode<>(
resultProcessorName,
new ProcessorParameters<>(
resultProcessorSupplier,
resultProcessorName
- ),
- resultStore
+ )
);
resultNode.setOutputVersioned(materializedInternal.storeSupplier()
instanceof VersionedBytesStoreSupplier);
builder.addGraphNode(responseJoinNode, resultNode);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index c26488c12a1..af495c9b9a3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -20,9 +20,14 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
+import java.util.Collections;
+import java.util.Set;
+
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
import static
org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
import static
org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST;
@@ -33,13 +38,16 @@ class KTableMapValues<KIn, VIn, VOut> implements
KTableProcessorSupplier<KIn, VI
private final ValueMapperWithKey<? super KIn, ? super VIn, ? extends VOut>
mapper;
private final String queryableName;
private boolean sendOldValues = false;
+ private final StoreFactory storeFactory;
KTableMapValues(final KTableImpl<KIn, ?, VIn> parent,
final ValueMapperWithKey<? super KIn, ? super VIn, ?
extends VOut> mapper,
- final String queryableName) {
+ final String queryableName,
+ final StoreFactory storeFactory) {
this.parent = parent;
this.mapper = mapper;
this.queryableName = queryableName;
+ this.storeFactory = storeFactory;
}
@Override
@@ -47,6 +55,14 @@ class KTableMapValues<KIn, VIn, VOut> implements
KTableProcessorSupplier<KIn, VI
return new KTableMapValuesProcessor();
}
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ if (storeFactory == null) {
+ return null;
+ }
+ return Collections.singleton(new
StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory));
+ }
+
@Override
public KTableValueGetterSupplier<KIn, VOut> view() {
// if the KTable is materialized, use the materialized store to return
getter value;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
index 1874bd807ed..38033693ebb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
@@ -26,7 +26,7 @@ public class TableFilterNode<K, V> extends
TableProcessorNode<K, V> implements V
public TableFilterNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?>
processorParameters,
final StoreFactory storeFactory) {
- super(nodeName, processorParameters, storeFactory);
+ super(nodeName, processorParameters, storeFactory, null);
}
@SuppressWarnings("unchecked")
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index b47252068e6..af3ab15d490 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -17,12 +17,10 @@
package org.apache.kafka.streams.kstream.internals.graph;
-import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import java.util.Arrays;
-import java.util.Objects;
public class TableProcessorNode<K, V> extends GraphNode {
@@ -31,9 +29,8 @@ public class TableProcessorNode<K, V> extends GraphNode {
private final String[] storeNames;
public TableProcessorNode(final String nodeName,
- final ProcessorParameters<K, V, ?, ?>
processorParameters,
- final StoreFactory storeFactory) {
- this(nodeName, processorParameters, storeFactory, null);
+ final ProcessorParameters<K, V, ?, ?>
processorParameters) {
+ this(nodeName, processorParameters, null, null);
}
public TableProcessorNode(final String nodeName,
@@ -62,21 +59,17 @@ public class TableProcessorNode<K, V> extends GraphNode {
@SuppressWarnings("unchecked")
@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ processorParameters.addProcessorTo(topologyBuilder, parentNodeNames());
+
final String processorName = processorParameters.processorName();
- topologyBuilder.addProcessor(processorName,
processorParameters.processorSupplier(), parentNodeNames());
if (storeNames.length > 0) {
+ // todo(rodesai): remove me once all operators have been moved to
ProcessorSupplier
topologyBuilder.connectProcessorAndStateStores(processorName,
storeNames);
}
- final KTableSource<K, V> tableSource =
processorParameters.processorSupplier() instanceof KTableSource ?
- (KTableSource<K, V>) processorParameters.processorSupplier() :
null;
- if (tableSource != null) {
- if (tableSource.materialized()) {
-
topologyBuilder.addStateStore(Objects.requireNonNull(storeFactory,
"storeFactory was null"),
- processorName);
- }
- } else if (storeFactory != null) {
+ if (storeFactory != null) {
+ // todo(rodesai) remove when KTableImpl#doFilter,
KTableImpl#doTransformValues moved to ProcessorSupplier
topologyBuilder.addStateStore(storeFactory, processorName);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 738f753f532..be38f051492 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -1621,6 +1621,26 @@ public class StreamsBuilderTest {
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
}
+ @Test
+ public void shouldWrapProcessorsForMapValuesWithMaterializedStore() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ builder.table("input", Consumed.as("source-table"))
+ .mapValues(v -> null, Named.as("map-values"),
Materialized.as("map-values-store"))
+ .toStream(Named.as("to-stream"))
+ .to("output-topic", Produced.as("sink"));
+ builder.build();
+
+ assertThat(counter.wrappedProcessorNames(),
+ Matchers.containsInAnyOrder("source-table", "map-values",
"to-stream"));
+ assertThat(counter.numUniqueStateStores(), is(1));
+ assertThat(counter.numConnectedStateStores(), is(1));
+ }
+
@Test
public void shouldWrapProcessorsForTableAggregate() {
final Map<Object, Object> props = dummyStreamsConfigMap();
@@ -1687,21 +1707,46 @@ public class StreamsBuilderTest {
.selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3
.peek((k, v) -> { }, Named.as("peek")) // wrapped 4
.flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) //
wrapped 5
- .toTable(Named.as("toTable")) // wrapped 6
- .filter((k, v) -> true, Named.as("filter-table")) // should be
wrapped once we do TableProcessorNode
- .toStream(Named.as("toStream")) // wrapped 7
.to("output", Produced.as("sink"));
builder.build();
- assertThat(counter.numWrappedProcessors(), CoreMatchers.is(8));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5));
assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
- "filter-stream", "map", "selectKey", "peek", "flatMap",
- "toTable-repartition-filter", "toStream", "toTable"
+ "filter-stream", "map", "selectKey", "peek", "flatMap"
));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0));
}
+ @Test
+ public void shouldWrapProcessorsWhenMultipleTableOperators() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ builder.stream("input", Consumed.as("source"))
+ .toTable(Named.as("to-table"))
+ .mapValues(v -> v, Named.as("map-values"))
+ .mapValues(v -> v, Named.as("map-values-stateful"),
Materialized.as("map-values-stateful"))
+ .filter((k, v) -> true, Named.as("filter-table"))
+ .filter((k, v) -> true, Named.as("filter-table-stateful"),
Materialized.as("filter-table-stateful"))
+ .toStream(Named.as("to-stream"))
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(6));
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "to-table", "map-values", "map-values-stateful",
+ "filter-table", "filter-table-stateful", "to-stream"
+ ));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
+ }
+
@Test
public void shouldWrapProcessorsForUnmaterializedSourceTable() {
final Map<Object, Object> props = dummyStreamsConfigMap();