This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 09e8fa2dbe0 KAFKA-18026: KIP-1112, migrate stream-table joins to use
ProcesserSupplier#stores (#18047)
09e8fa2dbe0 is described below
commit 09e8fa2dbe0517fcb08d756f67dfd4061f275a71
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Dec 5 10:06:11 2024 -0800
KAFKA-18026: KIP-1112, migrate stream-table joins to use
ProcesserSupplier#stores (#18047)
Covers wrapping of processors and state stores for KStream-KTable joins
Reviewers: Almog Gavra <[email protected]>, Guozhang Wang
<[email protected]>
---
.../streams/kstream/internals/KStreamImpl.java | 18 +++----
.../kstream/internals/KStreamKTableJoin.java | 21 ++++++--
.../internals/graph/StreamTableJoinNode.java | 12 +----
.../apache/kafka/streams/StreamsBuilderTest.java | 60 ++++++++++++++++++++++
4 files changed, 88 insertions(+), 23 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index b650724055b..820c31f29e4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -61,8 +61,8 @@ import
org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
import
org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBuffer;
@@ -1127,7 +1127,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
leftJoin);
final ProcessorParameters<K, V, ?, ?> processorParameters = new
ProcessorParameters<>(processorSupplier, name);
final StreamTableJoinNode<K, V> streamTableJoinNode =
- new StreamTableJoinNode<>(name, processorParameters, new String[]
{}, null, null, Optional.empty());
+ new StreamTableJoinNode<>(name, processorParameters, new String[]
{}, null, null);
if (leftJoin) {
streamTableJoinNode.labels().add(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
@@ -1159,16 +1159,14 @@ public class KStreamImpl<K, V> extends
AbstractStream<K, V> implements KStream<K
final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin
? LEFTJOIN_NAME : JOIN_NAME);
- Optional<String> bufferStoreName = Optional.empty();
+ Optional<StoreBuilder<?>> bufferStoreBuilder = Optional.empty();
if (joinedInternal.gracePeriod() != null) {
if (!((KTableImpl<K, ?, VO>)
table).graphNode.isOutputVersioned().orElse(true)) {
throw new IllegalArgumentException("KTable must be versioned
to use a grace period in a stream table join.");
}
- bufferStoreName = Optional.of(name + "-Buffer");
- final RocksDBTimeOrderedKeyValueBuffer.Builder<Object, Object>
storeBuilder =
- new
RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(),
joinedInternal.gracePeriod(), name);
-
builder.addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder));
+ final String bufferName = name + "-Buffer";
+ bufferStoreBuilder = Optional.of(new
RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferName,
joinedInternal.gracePeriod(), name));
}
final ProcessorSupplier<K, V, K, ? extends VR> processorSupplier = new
KStreamKTableJoin<>(
@@ -1176,7 +1174,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
joiner,
leftJoin,
Optional.ofNullable(joinedInternal.gracePeriod()),
- bufferStoreName);
+ bufferStoreBuilder
+ );
final ProcessorParameters<K, V, ?, ?> processorParameters = new
ProcessorParameters<>(processorSupplier, name);
final StreamTableJoinNode<K, V> streamTableJoinNode = new
StreamTableJoinNode<>(
@@ -1184,8 +1183,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
processorParameters,
((KTableImpl<K, ?, VO>) table).valueGetterSupplier().storeNames(),
this.name,
- joinedInternal.gracePeriod(),
- bufferStoreName
+ joinedInternal.gracePeriod()
);
builder.addGraphNode(graphNode, streamTableJoinNode);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
index d1ea36a470c..8fc775e2b9b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
@@ -20,9 +20,13 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.state.StoreBuilder;
import java.time.Duration;
import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Collections.singleton;
class KStreamKTableJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1,
K, VOut> {
@@ -32,18 +36,29 @@ class KStreamKTableJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1, K,
private final boolean leftJoin;
private final Optional<Duration> gracePeriod;
private final Optional<String> storeName;
-
+ private final Set<StoreBuilder<?>> stores;
KStreamKTableJoin(final KTableValueGetterSupplier<K, V2>
valueGetterSupplier,
final ValueJoinerWithKey<? super K, ? super V1, ? super
V2, VOut> joiner,
final boolean leftJoin,
final Optional<Duration> gracePeriod,
- final Optional<String> storeName) {
+ final Optional<StoreBuilder<?>> bufferStoreBuilder) {
this.valueGetterSupplier = valueGetterSupplier;
this.joiner = joiner;
this.leftJoin = leftJoin;
this.gracePeriod = gracePeriod;
- this.storeName = storeName;
+ this.storeName = bufferStoreBuilder.map(StoreBuilder::name);
+
+ if (bufferStoreBuilder.isEmpty()) {
+ this.stores = null;
+ } else {
+ this.stores = singleton(bufferStoreBuilder.get());
+ }
+ }
+
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ return stores;
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
index ad6083cbbcf..b73a8caaa55 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
@@ -17,12 +17,10 @@
package org.apache.kafka.streams.kstream.internals.graph;
-import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import java.time.Duration;
import java.util.Arrays;
-import java.util.Optional;
/**
* Represents a join between a KStream and a KTable or GlobalKTable
@@ -34,15 +32,12 @@ public class StreamTableJoinNode<K, V> extends GraphNode {
private final ProcessorParameters<K, V, ?, ?> processorParameters;
private final String otherJoinSideNodeName;
private final Duration gracePeriod;
- private final Optional<String> bufferName;
-
public StreamTableJoinNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?>
processorParameters,
final String[] storeNames,
final String otherJoinSideNodeName,
- final Duration gracePeriod,
- final Optional<String> bufferName) {
+ final Duration gracePeriod) {
super(nodeName);
// in the case of Stream-Table join the state stores associated with
the KTable
@@ -50,7 +45,6 @@ public class StreamTableJoinNode<K, V> extends GraphNode {
this.processorParameters = processorParameters;
this.otherJoinSideNodeName = otherJoinSideNodeName;
this.gracePeriod = gracePeriod;
- this.bufferName = bufferName;
}
@Override
@@ -65,15 +59,13 @@ public class StreamTableJoinNode<K, V> extends GraphNode {
@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
final String processorName = processorParameters.processorName();
- final ProcessorSupplier<K, V, ?, ?> processorSupplier =
processorParameters.processorSupplier();
// Stream - Table join (Global or KTable)
- topologyBuilder.addProcessor(processorName, processorSupplier,
parentNodeNames());
+ processorParameters.addProcessorTo(topologyBuilder, parentNodeNames());
// Steam - KTable join only
if (otherJoinSideNodeName != null) {
topologyBuilder.connectProcessorAndStateStores(processorName,
storeNames);
- bufferName.ifPresent(s ->
topologyBuilder.connectProcessorAndStateStores(processorName, s));
if (gracePeriod != null) {
for (final String storeName : storeNames) {
if (!topologyBuilder.isStoreVersioned(storeName)) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 36559b18e29..0c9031afbdf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -1756,6 +1756,66 @@ public class StreamsBuilderTest {
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
}
+ @Test
+ public void shouldWrapProcessorsForStreamTableJoin() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KStream<String, String> stream = builder.stream("input",
Consumed.as("source-stream"));
+ final KTable<String, String> table = builder.table("input-table",
Consumed.as("source-table"));
+
+ stream.join(
+ table,
+ MockValueJoiner.TOSTRING_JOINER,
+ Joined.as("st-join"))
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "source-table", "st-join"
+ ));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
+ }
+
+ @Test
+ public void shouldWrapProcessorsForStreamTableJoinWithGracePeriod() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KStream<String, String> stream = builder.stream("input",
Consumed.as("source-stream"));
+ final KTable<String, String> table = builder.table(
+ "input-table",
+ Consumed.as("versioned-source-table"),
+
Materialized.as(Stores.persistentVersionedKeyValueStore("table-store",
Duration.ofDays(1)))
+ );
+
+ stream.join(
+ table,
+ MockValueJoiner.TOSTRING_JOINER,
+ Joined.<String, String,
String>as("st-join").withGracePeriod(Duration.ofDays(1)))
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "versioned-source-table", "st-join"
+ ));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
+ }
+
@Test
public void shouldAllowStreamsFromSameTopic() {
builder.stream("topic");