This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push: new 84f9ad6f40f KAFKA-18479: RocksDBTimeOrderedKeyValueBuffer not initialized correctly (#18490) 84f9ad6f40f is described below commit 84f9ad6f40f61b5b519c1672ec7a8a3e3c0357dc Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Wed Jan 15 10:04:13 2025 -0800 KAFKA-18479: RocksDBTimeOrderedKeyValueBuffer not initialized correctly (#18490) RocksDBTimeOrderedKeyValueBuffer is not initialize with serdes provides via Joined, but always uses serdes from StreamsConfig. Reviewers: Bill Bejeck <b...@confluent.io> --- .../streams/kstream/internals/KStreamImpl.java | 10 ++++-- .../RocksDBTimeOrderedKeyValueBuffer.java | 18 ++++++++++- .../integration/AbstractJoinIntegrationTest.java | 17 ++++++---- .../StreamTableJoinWithGraceIntegrationTest.java | 12 ++++--- .../RocksDBTimeOrderedKeyValueBufferTest.java | 37 ++++++++++++++-------- 5 files changed, 65 insertions(+), 29 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 8cf2d725367..762e232d91e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -1269,8 +1269,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); } bufferStoreName = Optional.of(name + "-Buffer"); - final RocksDBTimeOrderedKeyValueBuffer.Builder<Object, Object> storeBuilder = - new RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), joined.gracePeriod(), name); + final RocksDBTimeOrderedKeyValueBuffer.Builder<K, V> storeBuilder = + new RocksDBTimeOrderedKeyValueBuffer.Builder<>( + bufferStoreName.get(), + joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde, + joinedInternal.valueSerde() != null ? joinedInternal.valueSerde() : valueSerde, + joined.gracePeriod(), + name + ); builder.addStateStore(new StoreBuilderWrapper(storeBuilder)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java index 2a8b3393aaa..fcad03d580d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java @@ -65,13 +65,23 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyVal public static class Builder<K, V> implements StoreBuilder<TimeOrderedKeyValueBuffer<K, V, V>> { private final String storeName; + private final Serde<K> keySerde; + private final Serde<V> valueSerde; private boolean loggingEnabled = true; private Map<String, String> logConfig = new HashMap<>(); private final Duration grace; private final String topic; - public Builder(final String storeName, final Duration grace, final String topic) { + public Builder( + final String storeName, + final Serde<K> keySerde, + final Serde<V> valueSerde, + final Duration grace, + final String topic + ) { this.storeName = storeName; + this.keySerde = keySerde; + this.valueSerde = valueSerde; this.grace = grace; this.topic = topic; } @@ -116,6 +126,8 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyVal public TimeOrderedKeyValueBuffer<K, V, V> build() { return new RocksDBTimeOrderedKeyValueBuffer<>( new RocksDBTimeOrderedKeyValueBytesStoreSupplier(storeName).get(), + keySerde, + valueSerde, grace, topic, loggingEnabled); @@ -139,10 +151,14 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyVal public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store, + final Serde<K> keySerde, + final Serde<V> valueSerde, final Duration gracePeriod, final String topic, final boolean loggingEnabled) { this.store = store; + this.keySerde = keySerde; + this.valueSerde = valueSerde; this.gracePeriod = gracePeriod.toMillis(); minTimestamp = store.minTimestamp(); minValid = false; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java index 3b84f286a7d..d9dfc41c511 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java @@ -125,10 +125,16 @@ public abstract class AbstractJoinIntegrationTest { final ValueJoiner<String, String, String> valueJoiner = (value1, value2) -> value1 + "-" + value2; Properties setupConfigsAndUtils(final boolean cacheEnabled) { + return setupConfigsAndUtils(cacheEnabled, true); + } + + Properties setupConfigsAndUtils(final boolean cacheEnabled, final boolean setSerdes) { final Properties streamsConfig = new Properties(); streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); - streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + if (setSerdes) { + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.LongSerde.class); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + } streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); if (!cacheEnabled) { streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); @@ -260,16 +266,13 @@ public abstract class AbstractJoinIntegrationTest { private void checkQueryableStore(final String queryableName, final TestRecord<Long, String> expectedFinalResult, final TopologyTestDriver driver) { final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore(queryableName); - final KeyValueIterator<Long, ValueAndTimestamp<String>> all = store.all(); - final KeyValue<Long, ValueAndTimestamp<String>> onlyEntry = all.next(); + try (final KeyValueIterator<Long, ValueAndTimestamp<String>> all = store.all()) { + final KeyValue<Long, ValueAndTimestamp<String>> onlyEntry = all.next(); - try { assertThat(onlyEntry.key, is(expectedFinalResult.key())); assertThat(onlyEntry.value.value(), is(expectedFinalResult.value())); assertThat(onlyEntry.value.timestamp(), is(expectedFinalResult.timestamp())); assertThat(all.hasNext(), is(false)); - } finally { - all.close(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java index bcfd2445de2..6195cbeb281 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java @@ -19,10 +19,12 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.test.TestRecord; @@ -52,13 +54,13 @@ public class StreamTableJoinWithGraceIntegrationTest extends AbstractJoinIntegra @ValueSource(booleans = {true, false}) public void testInnerWithVersionedStore(final boolean cacheEnabled) { final StreamsBuilder builder = new StreamsBuilder(); - final KStream<Long, String> leftStream = builder.stream(INPUT_TOPIC_LEFT); - final KTable<Long, String> rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as( + final KStream<Long, String> leftStream = builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.Long(), Serdes.String())); + final KTable<Long, String> rightTable = builder.table(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.as( Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMinutes(5)))); - final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled); + final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled, false); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner"); - leftStream.join(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC); + leftStream.join(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.String())); final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList( null, @@ -96,7 +98,7 @@ public class StreamTableJoinWithGraceIntegrationTest extends AbstractJoinIntegra final KStream<Long, String> leftStream = builder.stream(INPUT_TOPIC_LEFT); final KTable<Long, String> rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as( Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMinutes(5)))); - final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled); + final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled, true); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left"); leftStream.leftJoin(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java index 33b97af3386..6d9c487520a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; @@ -62,18 +63,16 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @BeforeEach public void setUp() { - when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde()); - when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde()); final Metrics metrics = new Metrics(); offset = 0; streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime()); context = new MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory()); } - private void createBuffer(final Duration grace) { + private void createBuffer(final Duration grace, final Serde<String> serde) { final RocksDBTimeOrderedKeyValueBytesStore store = new RocksDBTimeOrderedKeyValueBytesStoreSupplier("testing").get(); - buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, grace, "testing", false); + buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, serde, serde, grace, "testing", false); buffer.setSerdesIfNull(serdeGetter); buffer.init((StateStoreContext) context, store); } @@ -86,14 +85,16 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Test public void shouldReturnIfRecordWasAdded() { - createBuffer(Duration.ofMillis(1)); + when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); + when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); + createBuffer(Duration.ofMillis(1), null); assertThat(pipeRecord("K", "V", 2L), equalTo(true)); assertThat(pipeRecord("K", "V", 0L), equalTo(false)); } @Test public void shouldPutInBufferAndUpdateFields() { - createBuffer(Duration.ofMinutes(1)); + createBuffer(Duration.ofMinutes(1), Serdes.String()); assertNumSizeAndTimestamp(buffer, 0, Long.MAX_VALUE, 0); pipeRecord("1", "0", 0L); assertNumSizeAndTimestamp(buffer, 1, 0, 42); @@ -103,7 +104,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Test public void shouldAddAndEvictRecord() { - createBuffer(Duration.ZERO); + createBuffer(Duration.ZERO, Serdes.String()); final AtomicInteger count = new AtomicInteger(0); pipeRecord("1", "0", 0L); assertNumSizeAndTimestamp(buffer, 1, 0, 42); @@ -114,7 +115,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Test public void shouldAddAndEvictRecordTwice() { - createBuffer(Duration.ZERO); + when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); + when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); + createBuffer(Duration.ZERO, null); final AtomicInteger count = new AtomicInteger(0); pipeRecord("1", "0", 0L); assertNumSizeAndTimestamp(buffer, 1, 0, 42); @@ -130,7 +133,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Test public void shouldAddAndEvictRecordTwiceWithNonZeroGrace() { - createBuffer(Duration.ofMillis(1)); + createBuffer(Duration.ofMillis(1), Serdes.String()); final AtomicInteger count = new AtomicInteger(0); pipeRecord("1", "0", 0L); buffer.evictWhile(() -> buffer.numRecords() > 0, r -> count.getAndIncrement()); @@ -144,7 +147,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Test public void shouldAddRecordsTwiceAndEvictRecordsOnce() { - createBuffer(Duration.ZERO); + when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); + when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); + createBuffer(Duration.ZERO, null); final AtomicInteger count = new AtomicInteger(0); pipeRecord("1", "0", 0L); buffer.evictWhile(() -> buffer.numRecords() > 1, r -> count.getAndIncrement()); @@ -156,7 +161,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Test public void shouldDropLateRecords() { - createBuffer(Duration.ZERO); + when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); + when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); + createBuffer(Duration.ZERO, null); pipeRecord("1", "0", 1L); assertNumSizeAndTimestamp(buffer, 1, 1, 42); pipeRecord("2", "0", 0L); @@ -165,7 +172,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Test public void shouldDropLateRecordsWithNonZeroGrace() { - createBuffer(Duration.ofMillis(1)); + createBuffer(Duration.ofMillis(1), Serdes.String()); pipeRecord("1", "0", 2L); assertNumSizeAndTimestamp(buffer, 1, 2, 42); pipeRecord("2", "0", 1L); @@ -176,7 +183,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Test public void shouldHandleCollidingKeys() { - createBuffer(Duration.ofMillis(1)); + when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); + when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); + createBuffer(Duration.ofMillis(1), null); final AtomicInteger count = new AtomicInteger(0); pipeRecord("2", "0", 0L); buffer.evictWhile(() -> buffer.numRecords() > 0, r -> count.getAndIncrement()); @@ -201,4 +210,4 @@ public class RocksDBTimeOrderedKeyValueBufferTest { assertThat(buffer.minTimestamp(), equalTo(time)); assertThat(buffer.bufferSize(), equalTo(size)); } -} \ No newline at end of file +}