This is an automated email from the ASF dual-hosted git repository. ableegoldman pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push: new fe3bc9c709d KAFKA-16046: also fix stores for outer join (#15073) fe3bc9c709d is described below commit fe3bc9c709dded9920cad0e65812b747b964a649 Author: Almog Gavra <almog.ga...@gmail.com> AuthorDate: Tue Jan 2 15:07:46 2024 -0800 KAFKA-16046: also fix stores for outer join (#15073) This is the corollary to #15061 for outer joins, which don't use timestamped KV stores either (compared to just window stores). Reviewers: Anna Sophie Blee-Goldman <ableegold...@apache.org>, Lucas Brutschy <lbruts...@confluent.io> --- .../internals/KeyValueStoreMaterializer.java | 2 +- .../internals/OuterStreamJoinStoreFactory.java | 7 ++-- .../streams/state/BuiltInDslStoreSuppliers.java | 4 +- .../kafka/streams/state/DslKeyValueParams.java | 17 ++++++-- .../kafka/streams/state/DslWindowParams.java | 7 +++- .../apache/kafka/streams/StreamsBuilderTest.java | 4 +- .../internals/KStreamKStreamOuterJoinTest.java | 49 ++++++++++++++++++++++ 7 files changed, 77 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java index b54ba531815..3d11c451429 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java @@ -45,7 +45,7 @@ public class KeyValueStoreMaterializer<K, V> extends MaterializedStoreFactory<K, @Override public StateStore build() { final KeyValueBytesStoreSupplier supplier = materialized.storeSupplier() == null - ? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName())) + ? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName(), true)) : (KeyValueBytesStoreSupplier) materialized.storeSupplier(); final StoreBuilder<?> builder; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java index 123d1eade63..978bf39577c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java @@ -95,11 +95,12 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends AbstractConfigurable final TimestampedKeyAndJoinSideSerde<K> timestampedKeyAndJoinSideSerde = new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde()); final LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new LeftOrRightValueSerde<>(streamJoined.valueSerde(), streamJoined.otherValueSerde()); + final DslKeyValueParams dslKeyValueParams = new DslKeyValueParams(name, false); final KeyValueBytesStoreSupplier supplier; if (passedInDslStoreSuppliers != null) { // case 1: dslStoreSuppliers was explicitly passed in - supplier = passedInDslStoreSuppliers.keyValueStore(new DslKeyValueParams(name)); + supplier = passedInDslStoreSuppliers.keyValueStore(dslKeyValueParams); } else if (streamJoined.thisStoreSupplier() != null) { // case 2: thisStoreSupplier was explicitly passed in, we match // the type for that one @@ -110,12 +111,12 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends AbstractConfigurable } else { // couldn't determine the type of bytes store for thisStoreSupplier, // fallback to the default - supplier = dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name)); + supplier = dslStoreSuppliers().keyValueStore(dslKeyValueParams); } } else { // case 3: nothing was explicitly passed in, fallback to default which // was configured via either the TopologyConfig or StreamsConfig globally - supplier = dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name)); + supplier = dslStoreSuppliers().keyValueStore(dslKeyValueParams); } final StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> diff --git a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java index 71e7ac869fc..eedcafd7c79 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java @@ -36,7 +36,9 @@ public class BuiltInDslStoreSuppliers { @Override public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) { - return Stores.persistentTimestampedKeyValueStore(params.name()); + return params.isTimestamped() + ? Stores.persistentTimestampedKeyValueStore(params.name()) + : Stores.persistentKeyValueStore(params.name()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java b/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java index 1077da45fb3..7447d6c711f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java @@ -25,19 +25,26 @@ import java.util.Objects; public class DslKeyValueParams { private final String name; + private final boolean isTimestamped; /** - * @param name the name of the store (cannot be {@code null}) + * @param name the name of the store (cannot be {@code null}) + * @param isTimestamped whether the returned stores should be timestamped, see ({@link TimestampedKeyValueStore} */ - public DslKeyValueParams(final String name) { + public DslKeyValueParams(final String name, final boolean isTimestamped) { Objects.requireNonNull(name); this.name = name; + this.isTimestamped = isTimestamped; } public String name() { return name; } + public boolean isTimestamped() { + return isTimestamped; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -47,18 +54,20 @@ public class DslKeyValueParams { return false; } final DslKeyValueParams that = (DslKeyValueParams) o; - return Objects.equals(name, that.name); + return isTimestamped == that.isTimestamped + && Objects.equals(name, that.name); } @Override public int hashCode() { - return Objects.hash(name); + return Objects.hash(name, isTimestamped); } @Override public String toString() { return "DslKeyValueParams{" + "name='" + name + '\'' + + "isTimestamped=" + isTimestamped + '}'; } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java b/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java index 672afc66e0d..9a7f0fe9b38 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java @@ -108,7 +108,8 @@ public class DslWindowParams { && Objects.equals(retentionPeriod, that.retentionPeriod) && Objects.equals(windowSize, that.windowSize) && Objects.equals(emitStrategy, that.emitStrategy) - && Objects.equals(isSlidingWindow, that.isSlidingWindow); + && Objects.equals(isSlidingWindow, that.isSlidingWindow) + && Objects.equals(isTimestamped, that.isTimestamped); } @Override @@ -119,7 +120,8 @@ public class DslWindowParams { windowSize, retainDuplicates, emitStrategy, - isSlidingWindow + isSlidingWindow, + isTimestamped ); } @@ -132,6 +134,7 @@ public class DslWindowParams { ", retainDuplicates=" + retainDuplicates + ", emitStrategy=" + emitStrategy + ", isSlidingWindow=" + isSlidingWindow + + ", isTimestamped=" + isTimestamped + '}'; } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index e059da4fa87..72422f067d8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -52,7 +52,7 @@ import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; import org.apache.kafka.streams.state.internals.InMemorySessionStore; import org.apache.kafka.streams.state.internals.InMemoryWindowStore; -import org.apache.kafka.streams.state.internals.RocksDBTimestampedStore; +import org.apache.kafka.streams.state.internals.RocksDBStore; import org.apache.kafka.streams.state.internals.RocksDBWindowStore; import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.apache.kafka.test.MockApiProcessorSupplier; @@ -1299,7 +1299,7 @@ public class StreamsBuilderTest { assertTypesForStateStore(topology.stateStores(), InMemoryWindowStore.class, RocksDBWindowStore.class, - RocksDBTimestampedStore.class); + RocksDBStore.class); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java index 8133e25ec4b..099dc5b0c83 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java @@ -33,8 +33,12 @@ import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.StreamJoined; import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers; +import org.apache.kafka.streams.state.DslKeyValueParams; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.apache.kafka.streams.test.TestRecord; import org.apache.kafka.test.MockApiProcessor; import org.apache.kafka.test.MockApiProcessorSupplier; @@ -50,9 +54,11 @@ import java.util.Collection; import java.util.HashSet; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import static java.time.Duration.ZERO; import static java.time.Duration.ofMillis; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; public class KStreamKStreamOuterJoinTest { @@ -1308,4 +1314,47 @@ public class KStreamKStreamOuterJoinTest { new KeyValueTimestamp<>(0, "dummy+null", 1103L) ); } + + public static class CapturingStoreSuppliers extends BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers { + + final AtomicReference<KeyValueBytesStoreSupplier> capture = new AtomicReference<>(); + + @Override + public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) { + final KeyValueBytesStoreSupplier result = super.keyValueStore(params); + capture.set(result); + return result; + } + } + + @Test + public void shouldJoinWithNonTimestampedStore() { + final CapturingStoreSuppliers suppliers = new CapturingStoreSuppliers(); + final StreamJoined<Integer, String, String> streamJoined = + StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) + .withDslStoreSuppliers(suppliers); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<Integer, String> stream1; + final KStream<Integer, String> stream2; + final KStream<Integer, String> joined; + final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>(); + stream1 = builder.stream(topic1, consumed); + stream2 = builder.stream(topic2, consumed); + + joined = stream1.outerJoin( + stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)), + streamJoined + ); + joined.process(supplier); + + // create a TTD so that the topology gets built + try (final TopologyTestDriver ignored = new TopologyTestDriver(builder.build(PROPS), PROPS)) { + assertThat("Expected stream joined to supply builders that create non-timestamped stores", + !WrappedStateStore.isTimestamped(suppliers.capture.get().get())); + } + } }