This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 84f9ad6f40f KAFKA-18479: RocksDBTimeOrderedKeyValueBuffer not 
initialized correctly (#18490)
84f9ad6f40f is described below

commit 84f9ad6f40f61b5b519c1672ec7a8a3e3c0357dc
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Wed Jan 15 10:04:13 2025 -0800

    KAFKA-18479: RocksDBTimeOrderedKeyValueBuffer not initialized correctly 
(#18490)
    
    RocksDBTimeOrderedKeyValueBuffer is not initialize with serdes provides
    via Joined, but always uses serdes from StreamsConfig.
    
    Reviewers: Bill Bejeck <b...@confluent.io>
---
 .../streams/kstream/internals/KStreamImpl.java     | 10 ++++--
 .../RocksDBTimeOrderedKeyValueBuffer.java          | 18 ++++++++++-
 .../integration/AbstractJoinIntegrationTest.java   | 17 ++++++----
 .../StreamTableJoinWithGraceIntegrationTest.java   | 12 ++++---
 .../RocksDBTimeOrderedKeyValueBufferTest.java      | 37 ++++++++++++++--------
 5 files changed, 65 insertions(+), 29 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 8cf2d725367..762e232d91e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -1269,8 +1269,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
                 throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
             }
             bufferStoreName = Optional.of(name + "-Buffer");
-            final RocksDBTimeOrderedKeyValueBuffer.Builder<Object, Object> 
storeBuilder =
-                    new 
RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), 
joined.gracePeriod(), name);
+            final RocksDBTimeOrderedKeyValueBuffer.Builder<K, V> storeBuilder =
+                    new RocksDBTimeOrderedKeyValueBuffer.Builder<>(
+                        bufferStoreName.get(),
+                        joinedInternal.keySerde() != null ? 
joinedInternal.keySerde() : keySerde,
+                        joinedInternal.valueSerde() != null ? 
joinedInternal.valueSerde() : valueSerde,
+                        joined.gracePeriod(),
+                        name
+                    );
             builder.addStateStore(new StoreBuilderWrapper(storeBuilder));
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
index 2a8b3393aaa..fcad03d580d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
@@ -65,13 +65,23 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrderedKeyVal
     public static class Builder<K, V> implements 
StoreBuilder<TimeOrderedKeyValueBuffer<K, V, V>> {
 
         private final String storeName;
+        private final Serde<K> keySerde;
+        private final Serde<V> valueSerde;
         private boolean loggingEnabled = true;
         private Map<String, String> logConfig = new HashMap<>();
         private final Duration grace;
         private final String topic;
 
-        public Builder(final String storeName, final Duration grace, final 
String topic) {
+        public Builder(
+            final String storeName,
+            final Serde<K> keySerde,
+            final Serde<V> valueSerde,
+            final Duration grace,
+            final String topic
+        ) {
             this.storeName = storeName;
+            this.keySerde = keySerde;
+            this.valueSerde = valueSerde;
             this.grace = grace;
             this.topic = topic;
         }
@@ -116,6 +126,8 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrderedKeyVal
         public TimeOrderedKeyValueBuffer<K, V, V> build() {
             return new RocksDBTimeOrderedKeyValueBuffer<>(
                 new 
RocksDBTimeOrderedKeyValueBytesStoreSupplier(storeName).get(),
+                keySerde,
+                valueSerde,
                 grace,
                 topic,
                 loggingEnabled);
@@ -139,10 +151,14 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrderedKeyVal
 
 
     public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueBytesStore store,
+                                            final Serde<K> keySerde,
+                                            final Serde<V> valueSerde,
                                             final Duration gracePeriod,
                                             final String topic,
                                             final boolean loggingEnabled) {
         this.store = store;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
         this.gracePeriod = gracePeriod.toMillis();
         minTimestamp = store.minTimestamp();
         minValid = false;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 3b84f286a7d..d9dfc41c511 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -125,10 +125,16 @@ public abstract class AbstractJoinIntegrationTest {
     final ValueJoiner<String, String, String> valueJoiner = (value1, value2) 
-> value1 + "-" + value2;
 
     Properties setupConfigsAndUtils(final boolean cacheEnabled) {
+        return setupConfigsAndUtils(cacheEnabled, true);
+    }
+
+    Properties setupConfigsAndUtils(final boolean cacheEnabled, final boolean 
setSerdes) {
         final Properties streamsConfig = new Properties();
         streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
-        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        if (setSerdes) {
+            streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.LongSerde.class);
+            streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+        }
         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
COMMIT_INTERVAL);
         if (!cacheEnabled) {
             streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 
0);
@@ -260,16 +266,13 @@ public abstract class AbstractJoinIntegrationTest {
     private void checkQueryableStore(final String queryableName, final 
TestRecord<Long, String> expectedFinalResult, final TopologyTestDriver driver) {
         final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> store = 
driver.getTimestampedKeyValueStore(queryableName);
 
-        final KeyValueIterator<Long, ValueAndTimestamp<String>> all = 
store.all();
-        final KeyValue<Long, ValueAndTimestamp<String>> onlyEntry = all.next();
+        try (final KeyValueIterator<Long, ValueAndTimestamp<String>> all = 
store.all()) {
+            final KeyValue<Long, ValueAndTimestamp<String>> onlyEntry = 
all.next();
 
-        try {
             assertThat(onlyEntry.key, is(expectedFinalResult.key()));
             assertThat(onlyEntry.value.value(), 
is(expectedFinalResult.value()));
             assertThat(onlyEntry.value.timestamp(), 
is(expectedFinalResult.timestamp()));
             assertThat(all.hasNext(), is(false));
-        } finally {
-            all.close();
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
index bcfd2445de2..6195cbeb281 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
@@ -19,10 +19,12 @@ package org.apache.kafka.streams.integration;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.test.TestRecord;
 
@@ -52,13 +54,13 @@ public class StreamTableJoinWithGraceIntegrationTest 
extends AbstractJoinIntegra
     @ValueSource(booleans = {true, false})
     public void testInnerWithVersionedStore(final boolean cacheEnabled) {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
-        final KTable<Long, String> rightTable = 
builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
+        final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.Long(), Serdes.String()));
+        final KTable<Long, String> rightTable = 
builder.table(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.Long(), Serdes.String()), 
Materialized.as(
                 Stores.persistentVersionedKeyValueStore(STORE_NAME, 
Duration.ofMinutes(5))));
-        final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
+        final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled, 
false);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
 
-        leftStream.join(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC);
+        leftStream.join(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC, 
Produced.with(Serdes.Long(), Serdes.String()));
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
@@ -96,7 +98,7 @@ public class StreamTableJoinWithGraceIntegrationTest extends 
AbstractJoinIntegra
         final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
         final KTable<Long, String> rightTable = 
builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
                 Stores.persistentVersionedKeyValueStore(STORE_NAME, 
Duration.ofMinutes(5))));
-        final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
+        final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled, 
true);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
         leftStream.leftJoin(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
index 33b97af3386..6d9c487520a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
@@ -62,18 +63,16 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @BeforeEach
     public void setUp() {
-        when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde());
-        when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
         final Metrics metrics = new Metrics();
         offset = 0;
         streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime());
         context = new 
MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new 
TaskId(0, 0), TestUtils.tempDirectory());
     }
 
-    private void createBuffer(final Duration grace) {
+    private void createBuffer(final Duration grace, final Serde<String> serde) 
{
         final RocksDBTimeOrderedKeyValueBytesStore store = new 
RocksDBTimeOrderedKeyValueBytesStoreSupplier("testing").get();
 
-        buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, grace, 
"testing", false);
+        buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, serde, serde, 
grace, "testing", false);
         buffer.setSerdesIfNull(serdeGetter);
         buffer.init((StateStoreContext) context, store);
     }
@@ -86,14 +85,16 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @Test
     public void shouldReturnIfRecordWasAdded() {
-        createBuffer(Duration.ofMillis(1));
+        when(serdeGetter.keySerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        when(serdeGetter.valueSerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        createBuffer(Duration.ofMillis(1), null);
         assertThat(pipeRecord("K", "V", 2L), equalTo(true));
         assertThat(pipeRecord("K", "V", 0L), equalTo(false));
     }
 
     @Test
     public void shouldPutInBufferAndUpdateFields() {
-        createBuffer(Duration.ofMinutes(1));
+        createBuffer(Duration.ofMinutes(1), Serdes.String());
         assertNumSizeAndTimestamp(buffer, 0, Long.MAX_VALUE, 0);
         pipeRecord("1", "0", 0L);
         assertNumSizeAndTimestamp(buffer, 1, 0, 42);
@@ -103,7 +104,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @Test
     public void shouldAddAndEvictRecord() {
-        createBuffer(Duration.ZERO);
+        createBuffer(Duration.ZERO, Serdes.String());
         final AtomicInteger count = new AtomicInteger(0);
         pipeRecord("1", "0", 0L);
         assertNumSizeAndTimestamp(buffer, 1, 0, 42);
@@ -114,7 +115,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @Test
     public void shouldAddAndEvictRecordTwice() {
-        createBuffer(Duration.ZERO);
+        when(serdeGetter.keySerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        when(serdeGetter.valueSerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        createBuffer(Duration.ZERO, null);
         final AtomicInteger count = new AtomicInteger(0);
         pipeRecord("1", "0", 0L);
         assertNumSizeAndTimestamp(buffer, 1, 0, 42);
@@ -130,7 +133,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @Test
     public void shouldAddAndEvictRecordTwiceWithNonZeroGrace() {
-        createBuffer(Duration.ofMillis(1));
+        createBuffer(Duration.ofMillis(1), Serdes.String());
         final AtomicInteger count = new AtomicInteger(0);
         pipeRecord("1", "0", 0L);
         buffer.evictWhile(() -> buffer.numRecords() > 0, r -> 
count.getAndIncrement());
@@ -144,7 +147,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @Test
     public void shouldAddRecordsTwiceAndEvictRecordsOnce() {
-        createBuffer(Duration.ZERO);
+        when(serdeGetter.keySerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        when(serdeGetter.valueSerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        createBuffer(Duration.ZERO, null);
         final AtomicInteger count = new AtomicInteger(0);
         pipeRecord("1", "0", 0L);
         buffer.evictWhile(() -> buffer.numRecords() > 1, r -> 
count.getAndIncrement());
@@ -156,7 +161,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @Test
     public void shouldDropLateRecords() {
-        createBuffer(Duration.ZERO);
+        when(serdeGetter.keySerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        when(serdeGetter.valueSerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        createBuffer(Duration.ZERO, null);
         pipeRecord("1", "0", 1L);
         assertNumSizeAndTimestamp(buffer, 1, 1, 42);
         pipeRecord("2", "0", 0L);
@@ -165,7 +172,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @Test
     public void shouldDropLateRecordsWithNonZeroGrace() {
-        createBuffer(Duration.ofMillis(1));
+        createBuffer(Duration.ofMillis(1), Serdes.String());
         pipeRecord("1", "0", 2L);
         assertNumSizeAndTimestamp(buffer, 1, 2, 42);
         pipeRecord("2", "0", 1L);
@@ -176,7 +183,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @Test
     public void shouldHandleCollidingKeys() {
-        createBuffer(Duration.ofMillis(1));
+        when(serdeGetter.keySerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        when(serdeGetter.valueSerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        createBuffer(Duration.ofMillis(1), null);
         final AtomicInteger count = new AtomicInteger(0);
         pipeRecord("2", "0", 0L);
         buffer.evictWhile(() -> buffer.numRecords() > 0, r -> 
count.getAndIncrement());
@@ -201,4 +210,4 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
         assertThat(buffer.minTimestamp(), equalTo(time));
         assertThat(buffer.bufferSize(), equalTo(size));
     }
-}
\ No newline at end of file
+}

Reply via email to