This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 8c3f0886a14 KAFKA-18479: RocksDBTimeOrderedKeyValueBuffer not initialized correctly (#18490) 8c3f0886a14 is described below commit 8c3f0886a14aeb3876a8c2f5cea79213d0b35d4e Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Wed Jan 15 10:04:13 2025 -0800 KAFKA-18479: RocksDBTimeOrderedKeyValueBuffer not initialized correctly (#18490) RocksDBTimeOrderedKeyValueBuffer is not initialize with serdes provides via Joined, but always uses serdes from StreamsConfig. Reviewers: Bill Bejeck <b...@confluent.io> --- .../integration/AbstractJoinIntegrationTest.java | 17 ++++++----- .../StreamTableJoinWithGraceIntegrationTest.java | 12 ++++---- .../streams/kstream/internals/KStreamImpl.java | 8 ++++- .../RocksDBTimeOrderedKeyValueBuffer.java | 18 +++++++++++- .../RocksDBTimeOrderedKeyValueBufferTest.java | 34 +++++++++++++--------- 5 files changed, 62 insertions(+), 27 deletions(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java index 3b84f286a7d..d9dfc41c511 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java @@ -125,10 +125,16 @@ public abstract class AbstractJoinIntegrationTest { final ValueJoiner<String, String, String> valueJoiner = (value1, value2) -> value1 + "-" + value2; Properties setupConfigsAndUtils(final boolean cacheEnabled) { + return setupConfigsAndUtils(cacheEnabled, true); + } + + Properties setupConfigsAndUtils(final boolean cacheEnabled, final boolean setSerdes) { final Properties streamsConfig = new Properties(); streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); - streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + if (setSerdes) { + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.LongSerde.class); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + } streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); if (!cacheEnabled) { streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); @@ -260,16 +266,13 @@ public abstract class AbstractJoinIntegrationTest { private void checkQueryableStore(final String queryableName, final TestRecord<Long, String> expectedFinalResult, final TopologyTestDriver driver) { final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore(queryableName); - final KeyValueIterator<Long, ValueAndTimestamp<String>> all = store.all(); - final KeyValue<Long, ValueAndTimestamp<String>> onlyEntry = all.next(); + try (final KeyValueIterator<Long, ValueAndTimestamp<String>> all = store.all()) { + final KeyValue<Long, ValueAndTimestamp<String>> onlyEntry = all.next(); - try { assertThat(onlyEntry.key, is(expectedFinalResult.key())); assertThat(onlyEntry.value.value(), is(expectedFinalResult.value())); assertThat(onlyEntry.value.timestamp(), is(expectedFinalResult.timestamp())); assertThat(all.hasNext(), is(false)); - } finally { - all.close(); } } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java index bcfd2445de2..6195cbeb281 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java @@ -19,10 +19,12 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.test.TestRecord; @@ -52,13 +54,13 @@ public class StreamTableJoinWithGraceIntegrationTest extends AbstractJoinIntegra @ValueSource(booleans = {true, false}) public void testInnerWithVersionedStore(final boolean cacheEnabled) { final StreamsBuilder builder = new StreamsBuilder(); - final KStream<Long, String> leftStream = builder.stream(INPUT_TOPIC_LEFT); - final KTable<Long, String> rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as( + final KStream<Long, String> leftStream = builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.Long(), Serdes.String())); + final KTable<Long, String> rightTable = builder.table(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.as( Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMinutes(5)))); - final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled); + final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled, false); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner"); - leftStream.join(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC); + leftStream.join(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.String())); final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList( null, @@ -96,7 +98,7 @@ public class StreamTableJoinWithGraceIntegrationTest extends AbstractJoinIntegra final KStream<Long, String> leftStream = builder.stream(INPUT_TOPIC_LEFT); final KTable<Long, String> rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as( Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMinutes(5)))); - final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled); + final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled, true); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left"); leftStream.leftJoin(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 7deb9468c3f..7beaa1abffb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -1162,7 +1162,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); } final String bufferName = name + "-Buffer"; - bufferStoreBuilder = Optional.of(new RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferName, joinedInternal.gracePeriod(), name)); + bufferStoreBuilder = Optional.of(new RocksDBTimeOrderedKeyValueBuffer.Builder<>( + bufferName, + joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde, + joinedInternal.leftValueSerde() != null ? joinedInternal.leftValueSerde() : valueSerde, + joinedInternal.gracePeriod(), + name) + ); } final ProcessorSupplier<K, V, K, ? extends VR> processorSupplier = new KStreamKTableJoin<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java index 04ae4af7ae5..26065bf0fe3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java @@ -64,13 +64,23 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyVal public static class Builder<K, V> implements StoreBuilder<TimeOrderedKeyValueBuffer<K, V, V>> { private final String storeName; + private final Serde<K> keySerde; + private final Serde<V> valueSerde; private boolean loggingEnabled = true; private Map<String, String> logConfig = new HashMap<>(); private final Duration grace; private final String topic; - public Builder(final String storeName, final Duration grace, final String topic) { + public Builder( + final String storeName, + final Serde<K> keySerde, + final Serde<V> valueSerde, + final Duration grace, + final String topic + ) { this.storeName = storeName; + this.keySerde = keySerde; + this.valueSerde = valueSerde; this.grace = grace; this.topic = topic; } @@ -115,6 +125,8 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyVal public TimeOrderedKeyValueBuffer<K, V, V> build() { return new RocksDBTimeOrderedKeyValueBuffer<>( new RocksDBTimeOrderedKeyValueBytesStoreSupplier(storeName).get(), + keySerde, + valueSerde, grace, topic, loggingEnabled); @@ -138,10 +150,14 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyVal public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store, + final Serde<K> keySerde, + final Serde<V> valueSerde, final Duration gracePeriod, final String topic, final boolean loggingEnabled) { this.store = store; + this.keySerde = keySerde; + this.valueSerde = valueSerde; this.gracePeriod = gracePeriod.toMillis(); minTimestamp = store.minTimestamp(); minValid = false; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java index 936901cdc3a..9a92df55336 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java @@ -62,18 +62,16 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @SuppressWarnings({"rawtypes", "unchecked"}) @BeforeEach public void setUp() { - when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); - when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); final Metrics metrics = new Metrics(); offset = 0; streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime()); context = new MockInternalProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory()); } - private void createBuffer(final Duration grace) { + private void createBuffer(final Duration grace, final Serde<String> serde) { final RocksDBTimeOrderedKeyValueBytesStore store = new RocksDBTimeOrderedKeyValueBytesStoreSupplier("testing").get(); - buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, grace, "testing", false); + buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, serde, serde, grace, "testing", false); buffer.setSerdesIfNull(serdeGetter); buffer.init(context, store); } @@ -86,14 +84,16 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Test public void shouldReturnIfRecordWasAdded() { - createBuffer(Duration.ofMillis(1)); + when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); + when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); + createBuffer(Duration.ofMillis(1), null); assertThat(pipeRecord("K", "V", 2L), equalTo(true)); assertThat(pipeRecord("K", "V", 0L), equalTo(false)); } @Test public void shouldPutInBufferAndUpdateFields() { - createBuffer(Duration.ofMinutes(1)); + createBuffer(Duration.ofMinutes(1), Serdes.String()); assertNumSizeAndTimestamp(buffer, 0, Long.MAX_VALUE, 0); pipeRecord("1", "0", 0L); assertNumSizeAndTimestamp(buffer, 1, 0, 42); @@ -103,7 +103,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Test public void shouldAddAndEvictRecord() { - createBuffer(Duration.ZERO); + createBuffer(Duration.ZERO, Serdes.String()); final AtomicInteger count = new AtomicInteger(0); pipeRecord("1", "0", 0L); assertNumSizeAndTimestamp(buffer, 1, 0, 42); @@ -114,7 +114,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Test public void shouldAddAndEvictRecordTwice() { - createBuffer(Duration.ZERO); + when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); + when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); + createBuffer(Duration.ZERO, null); final AtomicInteger count = new AtomicInteger(0); pipeRecord("1", "0", 0L); assertNumSizeAndTimestamp(buffer, 1, 0, 42); @@ -130,7 +132,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Test public void shouldAddAndEvictRecordTwiceWithNonZeroGrace() { - createBuffer(Duration.ofMillis(1)); + createBuffer(Duration.ofMillis(1), Serdes.String()); final AtomicInteger count = new AtomicInteger(0); pipeRecord("1", "0", 0L); buffer.evictWhile(() -> buffer.numRecords() > 0, r -> count.getAndIncrement()); @@ -144,7 +146,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Test public void shouldAddRecordsTwiceAndEvictRecordsOnce() { - createBuffer(Duration.ZERO); + when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); + when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); + createBuffer(Duration.ZERO, null); final AtomicInteger count = new AtomicInteger(0); pipeRecord("1", "0", 0L); buffer.evictWhile(() -> buffer.numRecords() > 1, r -> count.getAndIncrement()); @@ -156,7 +160,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Test public void shouldDropLateRecords() { - createBuffer(Duration.ZERO); + when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); + when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); + createBuffer(Duration.ZERO, null); pipeRecord("1", "0", 1L); assertNumSizeAndTimestamp(buffer, 1, 1, 42); pipeRecord("2", "0", 0L); @@ -165,7 +171,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Test public void shouldDropLateRecordsWithNonZeroGrace() { - createBuffer(Duration.ofMillis(1)); + createBuffer(Duration.ofMillis(1), Serdes.String()); pipeRecord("1", "0", 2L); assertNumSizeAndTimestamp(buffer, 1, 2, 42); pipeRecord("2", "0", 1L); @@ -176,7 +182,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Test public void shouldHandleCollidingKeys() { - createBuffer(Duration.ofMillis(1)); + when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); + when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); + createBuffer(Duration.ofMillis(1), null); final AtomicInteger count = new AtomicInteger(0); pipeRecord("2", "0", 0L); buffer.evictWhile(() -> buffer.numRecords() > 0, r -> count.getAndIncrement());