This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 21563380f3e KAFKA-18026: KIP-1112, migrate table-table joins to use
ProcesserSuppliers#stores (#18048)
21563380f3e is described below
commit 21563380f3e5bfab901d60319d58de93cada61d3
Author: Almog Gavra <[email protected]>
AuthorDate: Wed Dec 11 17:37:34 2024 -0800
KAFKA-18026: KIP-1112, migrate table-table joins to use
ProcesserSuppliers#stores (#18048)
Covers wrapping of processors and state stores for KTable-KTable joins
Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Guozhang
Wang <[email protected]>
---
.../streams/kstream/internals/KTableImpl.java | 50 ++++++-----
.../internals/KTableKTableAbstractJoin.java | 8 ++
.../kstream/internals/KTableKTableJoinMerger.java | 21 ++++-
.../internals/graph/KTableKTableJoinNode.java | 54 +++---------
.../internals/graph/ProcessorParameters.java | 2 +-
.../apache/kafka/streams/StreamsBuilderTest.java | 96 ++++++++++++++++++++++
6 files changed, 160 insertions(+), 71 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index a0d5ff4d7c0..f90d35827f1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -748,26 +748,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
((KTableImpl<?, ?, ?>) other).enableSendingOldValues(true);
}
- final KTableKTableAbstractJoin<K, V, VO, VR> joinThis;
- final KTableKTableAbstractJoin<K, VO, V, VR> joinOther;
-
- if (!leftOuter) { // inner
- joinThis = new KTableKTableInnerJoin<>(this, (KTableImpl<K, ?,
VO>) other, joiner);
- joinOther = new KTableKTableInnerJoin<>((KTableImpl<K, ?, VO>)
other, this, reverseJoiner(joiner));
- } else if (!rightOuter) { // left
- joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, VO>)
other, joiner);
- joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, VO>)
other, this, reverseJoiner(joiner));
- } else { // outer
- joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?,
VO>) other, joiner);
- joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, VO>)
other, this, reverseJoiner(joiner));
- }
-
- final String joinThisName = renamed.suffixWithOrElseGet("-join-this",
builder, JOINTHIS_NAME);
- final String joinOtherName =
renamed.suffixWithOrElseGet("-join-other", builder, JOINOTHER_NAME);
-
- final ProcessorParameters<K, Change<V>, ?, ?>
joinThisProcessorParameters = new ProcessorParameters<>(joinThis, joinThisName);
- final ProcessorParameters<K, Change<VO>, ?, ?>
joinOtherProcessorParameters = new ProcessorParameters<>(joinOther,
joinOtherName);
-
final Serde<K> keySerde;
final Serde<VR> valueSerde;
final String queryableStoreName;
@@ -788,19 +768,45 @@ public class KTableImpl<K, S, V> extends
AbstractStream<K, V> implements KTable<
storeFactory = null;
}
+ final KTableKTableAbstractJoin<K, V, VO, VR> joinThis;
+ final KTableKTableAbstractJoin<K, VO, V, VR> joinOther;
+
+ if (!leftOuter) { // inner
+ joinThis = new KTableKTableInnerJoin<>(this, (KTableImpl<K, ?,
VO>) other, joiner);
+ joinOther = new KTableKTableInnerJoin<>((KTableImpl<K, ?, VO>)
other, this, reverseJoiner(joiner));
+ } else if (!rightOuter) { // left
+ joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, VO>)
other, joiner);
+ joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, VO>)
other, this, reverseJoiner(joiner));
+ } else { // outer
+ joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?,
VO>) other, joiner);
+ joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, VO>)
other, this, reverseJoiner(joiner));
+ }
+
+ final String joinThisName = renamed.suffixWithOrElseGet("-join-this",
builder, JOINTHIS_NAME);
+ final String joinOtherName =
renamed.suffixWithOrElseGet("-join-other", builder, JOINOTHER_NAME);
+
+ final ProcessorParameters<K, Change<V>, ?, ?>
joinThisProcessorParameters = new ProcessorParameters<>(joinThis, joinThisName);
+ final ProcessorParameters<K, Change<VO>, ?, ?>
joinOtherProcessorParameters = new ProcessorParameters<>(joinOther,
joinOtherName);
+ final ProcessorParameters<K, Change<VR>, ?, ?>
joinMergeProcessorParameters = new ProcessorParameters<>(
+ KTableKTableJoinMerger.of(
+ (KTableProcessorSupplier<K, V, K, VR>)
joinThisProcessorParameters.processorSupplier(),
+ (KTableProcessorSupplier<K, VO, K, VR>)
joinOtherProcessorParameters.processorSupplier(),
+ queryableStoreName,
+ storeFactory),
+ joinMergeName);
+
final KTableKTableJoinNode<K, V, VO, VR> kTableKTableJoinNode =
KTableKTableJoinNode.<K, V, VO, VR>kTableKTableJoinNodeBuilder()
.withNodeName(joinMergeName)
.withJoinThisProcessorParameters(joinThisProcessorParameters)
.withJoinOtherProcessorParameters(joinOtherProcessorParameters)
+ .withMergeProcessorParameters(joinMergeProcessorParameters)
.withThisJoinSideNodeName(name)
.withOtherJoinSideNodeName(((KTableImpl<?, ?, ?>) other).name)
.withJoinThisStoreNames(valueGetterSupplier().storeNames())
.withJoinOtherStoreNames(((KTableImpl<?, ?, ?>)
other).valueGetterSupplier().storeNames())
.withKeySerde(keySerde)
.withValueSerde(valueSerde)
- .withQueryableStoreName(queryableStoreName)
- .withStoreBuilder(storeFactory)
.build();
final boolean isOutputVersioned = materializedInternal != null
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
index 21339c0b649..6db3388c81c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
@@ -17,6 +17,9 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.Set;
public abstract class KTableKTableAbstractJoin<K, V1, V2, VOut> implements
KTableProcessorSupplier<K, V1, K, VOut> {
@@ -49,6 +52,11 @@ public abstract class KTableKTableAbstractJoin<K, V1, V2,
VOut> implements
return true;
}
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ return null;
+ }
+
public void setUseVersionedSemantics(final boolean useVersionedSemantics) {
this.useVersionedSemantics = useVersionedSemantics;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 7924f8ea857..bf9ddef3356 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -20,6 +20,8 @@ import
org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
import java.util.Collections;
@@ -34,14 +36,17 @@ public class KTableKTableJoinMerger<K, V> implements
KTableProcessorSupplier<K,
private final KTableProcessorSupplier<K, ?, K, V> parent1;
private final KTableProcessorSupplier<K, ?, K, V> parent2;
private final String queryableName;
+ private final StoreFactory storeFactory;
private boolean sendOldValues = false;
KTableKTableJoinMerger(final KTableProcessorSupplier<K, ?, K, V> parent1,
final KTableProcessorSupplier<K, ?, K, V> parent2,
- final String queryableName) {
+ final String queryableName,
+ final StoreFactory storeFactory) {
this.parent1 = parent1;
this.parent2 = parent2;
this.queryableName = queryableName;
+ this.storeFactory = storeFactory;
}
public String queryableName() {
@@ -53,6 +58,13 @@ public class KTableKTableJoinMerger<K, V> implements
KTableProcessorSupplier<K,
return new KTableKTableJoinMergeProcessor();
}
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ return storeFactory == null
+ ? null
+ : Set.of(new
StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory));
+ }
+
@Override
public KTableValueGetterSupplier<K, V> view() {
// if the result KTable is materialized, use the materialized store to
return getter value;
@@ -90,13 +102,14 @@ public class KTableKTableJoinMerger<K, V> implements
KTableProcessorSupplier<K,
public static <K, V> KTableKTableJoinMerger<K, V> of(final
KTableProcessorSupplier<K, ?, K, V> parent1,
final
KTableProcessorSupplier<K, ?, K, V> parent2) {
- return of(parent1, parent2, null);
+ return of(parent1, parent2, null, null);
}
public static <K, V> KTableKTableJoinMerger<K, V> of(final
KTableProcessorSupplier<K, ?, K, V> parent1,
final
KTableProcessorSupplier<K, ?, K, V> parent2,
- final String
queryableName) {
- return new KTableKTableJoinMerger<>(parent1, parent2, queryableName);
+ final String
queryableName,
+ final StoreFactory
stores) {
+ return new KTableKTableJoinMerger<>(parent1, parent2, queryableName,
stores);
}
private class KTableKTableJoinMergeProcessor extends
ContextualProcessor<K, Change<V>, K, Change<V>> {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index 6cfdd53784c..275a88b7676 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -21,10 +21,8 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableKTableAbstractJoin;
import org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger;
-import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
import java.util.Arrays;
@@ -37,7 +35,6 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
private final Serde<VR> valueSerde;
private final String[] joinThisStoreNames;
private final String[] joinOtherStoreNames;
- private final StoreFactory storeFactory;
KTableKTableJoinNode(final String nodeName,
final ProcessorParameters<K, Change<V1>, ?, ?>
joinThisProcessorParameters,
@@ -48,8 +45,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
final Serde<K> keySerde,
final Serde<VR> valueSerde,
final String[] joinThisStoreNames,
- final String[] joinOtherStoreNames,
- final StoreFactory storeFactory) {
+ final String[] joinOtherStoreNames) {
super(nodeName,
null,
@@ -63,7 +59,6 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
this.valueSerde = valueSerde;
this.joinThisStoreNames = joinThisStoreNames;
this.joinOtherStoreNames = joinOtherStoreNames;
- this.storeFactory = storeFactory;
}
public Serde<K> keySerde() {
@@ -120,30 +115,13 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
final String thisProcessorName =
thisProcessorParameters().processorName();
final String otherProcessorName =
otherProcessorParameters().processorName();
- final String mergeProcessorName =
mergeProcessorParameters().processorName();
- topologyBuilder.addProcessor(
- thisProcessorName,
- thisProcessorParameters().processorSupplier(),
- thisJoinSideNodeName());
-
- topologyBuilder.addProcessor(
- otherProcessorName,
- otherProcessorParameters().processorSupplier(),
- otherJoinSideNodeName());
-
- topologyBuilder.addProcessor(
- mergeProcessorName,
- mergeProcessorParameters().processorSupplier(),
- thisProcessorName,
- otherProcessorName);
+ thisProcessorParameters().addProcessorTo(topologyBuilder,
thisJoinSideNodeName());
+ otherProcessorParameters().addProcessorTo(topologyBuilder,
otherJoinSideNodeName());
+ mergeProcessorParameters().addProcessorTo(topologyBuilder,
thisProcessorName, otherProcessorName);
topologyBuilder.connectProcessorAndStateStores(thisProcessorName,
joinOtherStoreNames);
topologyBuilder.connectProcessorAndStateStores(otherProcessorName,
joinThisStoreNames);
-
- if (storeFactory != null) {
- topologyBuilder.addStateStore(storeFactory, mergeProcessorName);
- }
}
@Override
@@ -168,8 +146,8 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
private Serde<VR> valueSerde;
private String[] joinThisStoreNames;
private String[] joinOtherStoreNames;
- private String queryableStoreName;
- private StoreFactory storeFactory;
+ private ProcessorParameters<K, Change<VR>, ?, ?>
+ joinMergeProcessorParameters;
private KTableKTableJoinNodeBuilder() {
}
@@ -219,35 +197,23 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
return this;
}
- public KTableKTableJoinNodeBuilder<K, V1, V2, VR>
withQueryableStoreName(final String queryableStoreName) {
- this.queryableStoreName = queryableStoreName;
- return this;
- }
-
- public KTableKTableJoinNodeBuilder<K, V1, V2, VR>
withStoreBuilder(final StoreFactory storeFactory) {
- this.storeFactory = storeFactory;
+ public KTableKTableJoinNodeBuilder<K, V1, V2, VR>
withMergeProcessorParameters(final ProcessorParameters<K, Change<VR>, ?, ?>
joinMergeProcessorParameters) {
+ this.joinMergeProcessorParameters = joinMergeProcessorParameters;
return this;
}
- @SuppressWarnings("unchecked")
public KTableKTableJoinNode<K, V1, V2, VR> build() {
return new KTableKTableJoinNode<>(
nodeName,
joinThisProcessorParameters,
joinOtherProcessorParameters,
- new ProcessorParameters<>(
- KTableKTableJoinMerger.of(
- (KTableProcessorSupplier<K, V1, K, VR>)
joinThisProcessorParameters.processorSupplier(),
- (KTableProcessorSupplier<K, V2, K, VR>)
joinOtherProcessorParameters.processorSupplier(),
- queryableStoreName),
- nodeName),
+ joinMergeProcessorParameters,
thisJoinSide,
otherJoinSide,
keySerde,
valueSerde,
joinThisStoreNames,
- joinOtherStoreNames,
- storeFactory
+ joinOtherStoreNames
);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
index bae4172ebb6..7cfbc94533e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
@@ -63,7 +63,7 @@ public class ProcessorParameters<KIn, VIn, KOut, VOut> {
return fixedKeyProcessorSupplier;
}
- public void addProcessorTo(final InternalTopologyBuilder topologyBuilder,
final String[] parentNodeNames) {
+ public void addProcessorTo(final InternalTopologyBuilder topologyBuilder,
final String... parentNodeNames) {
if (processorSupplier != null) {
ApiUtils.checkSupplier(processorSupplier);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 056721fa8af..5371d53949f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -1807,6 +1807,102 @@ public class StreamsBuilderTest {
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
}
+ @Test
+ public void shouldWrapProcessorsForTableTableInnerJoin() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KTable<String, String> t1 = builder.table("input1",
Consumed.as("input1")); // 1
+ final KTable<String, String> t2 = builder.table("input2",
Consumed.as("input2")); // 2
+
+ t1.join(t2, (v1, v2) -> v1 + v2, Named.as("join-processor"),
Materialized.as("the_join")) // 3 (this), 4 (other), 5 (merger)
+ .toStream(Named.as("toStream")) // 6
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(6));
+ assertThat(counter.wrappedProcessorNames().toString(),
counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
+ "input1",
+ "input2",
+ "join-processor-join-this",
+ "join-processor-join-other",
+ "join-processor",
+ "toStream"
+ ));
+
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(3)); // one
for join this, one for join that
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(3));
+ }
+
+ @Test
+ public void shouldWrapProcessorsForTableTableLeftJoin() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KTable<String, String> t1 = builder.table("input1",
Consumed.as("input1")); // 1
+ final KTable<String, String> t2 = builder.table("input2",
Consumed.as("input2")); // 2
+
+ t1.leftJoin(t2, (v1, v2) -> v1 + v2, Named.as("join-processor"),
Materialized.as("the_join")) // 3 (this), 4 (other), 5 (merger)
+ .toStream(Named.as("toStream")) // 6
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(6));
+ assertThat(counter.wrappedProcessorNames().toString(),
counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
+ "input1",
+ "input2",
+ "join-processor-join-this",
+ "join-processor-join-other",
+ "join-processor",
+ "toStream"
+ ));
+
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(3)); //
table1, table2, join materialized
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(3));
+ }
+
+ @Test
+ public void shouldWrapProcessorsForTableTableOuterJoin() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KTable<String, String> t1 = builder.table("input1",
Consumed.as("input1")); // 1
+ final KTable<String, String> t2 = builder.table("input2",
Consumed.as("input2")); // 2
+
+ t1.outerJoin(t2, (v1, v2) -> v1 + v2, Named.as("join-processor"),
Materialized.as("the_join")) // 3 (this), 4 (other), 5 (merger)
+ .toStream(Named.as("toStream")) // 6
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(6));
+ assertThat(counter.wrappedProcessorNames().toString(),
counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
+ "input1",
+ "input2",
+ "join-processor-join-this",
+ "join-processor-join-other",
+ "join-processor",
+ "toStream"
+ ));
+
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(3)); //
table1, table2, join materialized
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(3));
+ }
+
@Test
public void shouldAllowStreamsFromSameTopic() {
builder.stream("topic");