This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 30f94b53207 KAFKA-18479: RocksDBTimeOrderedKeyValueBuffer not 
initialized correctly (#18490)
30f94b53207 is described below

commit 30f94b53207d87f195178bd35fb29ef9eb74df9a
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Wed Jan 15 10:04:13 2025 -0800

    KAFKA-18479: RocksDBTimeOrderedKeyValueBuffer not initialized correctly 
(#18490)
    
    RocksDBTimeOrderedKeyValueBuffer is not initialize with serdes provides
    via Joined, but always uses serdes from StreamsConfig.
    
    Reviewers: Bill Bejeck <b...@confluent.io>
---
 .../integration/AbstractJoinIntegrationTest.java   | 17 ++++++-----
 .../StreamTableJoinWithGraceIntegrationTest.java   | 12 ++++----
 .../streams/kstream/internals/KStreamImpl.java     |  8 ++++-
 .../RocksDBTimeOrderedKeyValueBuffer.java          | 18 +++++++++++-
 .../RocksDBTimeOrderedKeyValueBufferTest.java      | 34 +++++++++++++---------
 5 files changed, 62 insertions(+), 27 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 3b84f286a7d..d9dfc41c511 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -125,10 +125,16 @@ public abstract class AbstractJoinIntegrationTest {
     final ValueJoiner<String, String, String> valueJoiner = (value1, value2) 
-> value1 + "-" + value2;
 
     Properties setupConfigsAndUtils(final boolean cacheEnabled) {
+        return setupConfigsAndUtils(cacheEnabled, true);
+    }
+
+    Properties setupConfigsAndUtils(final boolean cacheEnabled, final boolean 
setSerdes) {
         final Properties streamsConfig = new Properties();
         streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
-        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        if (setSerdes) {
+            streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.LongSerde.class);
+            streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+        }
         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
COMMIT_INTERVAL);
         if (!cacheEnabled) {
             streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 
0);
@@ -260,16 +266,13 @@ public abstract class AbstractJoinIntegrationTest {
     private void checkQueryableStore(final String queryableName, final 
TestRecord<Long, String> expectedFinalResult, final TopologyTestDriver driver) {
         final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> store = 
driver.getTimestampedKeyValueStore(queryableName);
 
-        final KeyValueIterator<Long, ValueAndTimestamp<String>> all = 
store.all();
-        final KeyValue<Long, ValueAndTimestamp<String>> onlyEntry = all.next();
+        try (final KeyValueIterator<Long, ValueAndTimestamp<String>> all = 
store.all()) {
+            final KeyValue<Long, ValueAndTimestamp<String>> onlyEntry = 
all.next();
 
-        try {
             assertThat(onlyEntry.key, is(expectedFinalResult.key()));
             assertThat(onlyEntry.value.value(), 
is(expectedFinalResult.value()));
             assertThat(onlyEntry.value.timestamp(), 
is(expectedFinalResult.timestamp()));
             assertThat(all.hasNext(), is(false));
-        } finally {
-            all.close();
         }
     }
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
index bcfd2445de2..6195cbeb281 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
@@ -19,10 +19,12 @@ package org.apache.kafka.streams.integration;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.test.TestRecord;
 
@@ -52,13 +54,13 @@ public class StreamTableJoinWithGraceIntegrationTest 
extends AbstractJoinIntegra
     @ValueSource(booleans = {true, false})
     public void testInnerWithVersionedStore(final boolean cacheEnabled) {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
-        final KTable<Long, String> rightTable = 
builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
+        final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.Long(), Serdes.String()));
+        final KTable<Long, String> rightTable = 
builder.table(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.Long(), Serdes.String()), 
Materialized.as(
                 Stores.persistentVersionedKeyValueStore(STORE_NAME, 
Duration.ofMinutes(5))));
-        final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
+        final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled, 
false);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
 
-        leftStream.join(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC);
+        leftStream.join(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC, 
Produced.with(Serdes.Long(), Serdes.String()));
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
@@ -96,7 +98,7 @@ public class StreamTableJoinWithGraceIntegrationTest extends 
AbstractJoinIntegra
         final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
         final KTable<Long, String> rightTable = 
builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
                 Stores.persistentVersionedKeyValueStore(STORE_NAME, 
Duration.ofMinutes(5))));
-        final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
+        final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled, 
true);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
         leftStream.leftJoin(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7deb9468c3f..7beaa1abffb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -1162,7 +1162,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
                 throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
             }
             final String bufferName = name + "-Buffer";
-            bufferStoreBuilder = Optional.of(new 
RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferName, 
joinedInternal.gracePeriod(), name));
+            bufferStoreBuilder = Optional.of(new 
RocksDBTimeOrderedKeyValueBuffer.Builder<>(
+                bufferName,
+                joinedInternal.keySerde() != null ? joinedInternal.keySerde() 
: keySerde,
+                joinedInternal.leftValueSerde() != null ? 
joinedInternal.leftValueSerde() : valueSerde,
+                joinedInternal.gracePeriod(),
+                name)
+            );
         }
 
         final ProcessorSupplier<K, V, K, ? extends VR> processorSupplier = new 
KStreamKTableJoin<>(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
index 04ae4af7ae5..26065bf0fe3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
@@ -64,13 +64,23 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrderedKeyVal
     public static class Builder<K, V> implements 
StoreBuilder<TimeOrderedKeyValueBuffer<K, V, V>> {
 
         private final String storeName;
+        private final Serde<K> keySerde;
+        private final Serde<V> valueSerde;
         private boolean loggingEnabled = true;
         private Map<String, String> logConfig = new HashMap<>();
         private final Duration grace;
         private final String topic;
 
-        public Builder(final String storeName, final Duration grace, final 
String topic) {
+        public Builder(
+            final String storeName,
+            final Serde<K> keySerde,
+            final Serde<V> valueSerde,
+            final Duration grace,
+            final String topic
+        ) {
             this.storeName = storeName;
+            this.keySerde = keySerde;
+            this.valueSerde = valueSerde;
             this.grace = grace;
             this.topic = topic;
         }
@@ -115,6 +125,8 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrderedKeyVal
         public TimeOrderedKeyValueBuffer<K, V, V> build() {
             return new RocksDBTimeOrderedKeyValueBuffer<>(
                 new 
RocksDBTimeOrderedKeyValueBytesStoreSupplier(storeName).get(),
+                keySerde,
+                valueSerde,
                 grace,
                 topic,
                 loggingEnabled);
@@ -138,10 +150,14 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrderedKeyVal
 
 
     public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueBytesStore store,
+                                            final Serde<K> keySerde,
+                                            final Serde<V> valueSerde,
                                             final Duration gracePeriod,
                                             final String topic,
                                             final boolean loggingEnabled) {
         this.store = store;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
         this.gracePeriod = gracePeriod.toMillis();
         minTimestamp = store.minTimestamp();
         minValid = false;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
index 936901cdc3a..9a92df55336 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
@@ -62,18 +62,16 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
     @SuppressWarnings({"rawtypes", "unchecked"})
     @BeforeEach
     public void setUp() {
-        when(serdeGetter.keySerde()).thenReturn((Serde) new 
Serdes.StringSerde());
-        when(serdeGetter.valueSerde()).thenReturn((Serde) new 
Serdes.StringSerde());
         final Metrics metrics = new Metrics();
         offset = 0;
         streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", 
"processId", new MockTime());
         context = new 
MockInternalProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new 
TaskId(0, 0), TestUtils.tempDirectory());
     }
 
-    private void createBuffer(final Duration grace) {
+    private void createBuffer(final Duration grace, final Serde<String> serde) 
{
         final RocksDBTimeOrderedKeyValueBytesStore store = new 
RocksDBTimeOrderedKeyValueBytesStoreSupplier("testing").get();
 
-        buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, grace, 
"testing", false);
+        buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, serde, serde, 
grace, "testing", false);
         buffer.setSerdesIfNull(serdeGetter);
         buffer.init(context, store);
     }
@@ -86,14 +84,16 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @Test
     public void shouldReturnIfRecordWasAdded() {
-        createBuffer(Duration.ofMillis(1));
+        when(serdeGetter.keySerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        when(serdeGetter.valueSerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        createBuffer(Duration.ofMillis(1), null);
         assertThat(pipeRecord("K", "V", 2L), equalTo(true));
         assertThat(pipeRecord("K", "V", 0L), equalTo(false));
     }
 
     @Test
     public void shouldPutInBufferAndUpdateFields() {
-        createBuffer(Duration.ofMinutes(1));
+        createBuffer(Duration.ofMinutes(1), Serdes.String());
         assertNumSizeAndTimestamp(buffer, 0, Long.MAX_VALUE, 0);
         pipeRecord("1", "0", 0L);
         assertNumSizeAndTimestamp(buffer, 1, 0, 42);
@@ -103,7 +103,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @Test
     public void shouldAddAndEvictRecord() {
-        createBuffer(Duration.ZERO);
+        createBuffer(Duration.ZERO, Serdes.String());
         final AtomicInteger count = new AtomicInteger(0);
         pipeRecord("1", "0", 0L);
         assertNumSizeAndTimestamp(buffer, 1, 0, 42);
@@ -114,7 +114,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @Test
     public void shouldAddAndEvictRecordTwice() {
-        createBuffer(Duration.ZERO);
+        when(serdeGetter.keySerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        when(serdeGetter.valueSerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        createBuffer(Duration.ZERO, null);
         final AtomicInteger count = new AtomicInteger(0);
         pipeRecord("1", "0", 0L);
         assertNumSizeAndTimestamp(buffer, 1, 0, 42);
@@ -130,7 +132,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @Test
     public void shouldAddAndEvictRecordTwiceWithNonZeroGrace() {
-        createBuffer(Duration.ofMillis(1));
+        createBuffer(Duration.ofMillis(1), Serdes.String());
         final AtomicInteger count = new AtomicInteger(0);
         pipeRecord("1", "0", 0L);
         buffer.evictWhile(() -> buffer.numRecords() > 0, r -> 
count.getAndIncrement());
@@ -144,7 +146,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @Test
     public void shouldAddRecordsTwiceAndEvictRecordsOnce() {
-        createBuffer(Duration.ZERO);
+        when(serdeGetter.keySerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        when(serdeGetter.valueSerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        createBuffer(Duration.ZERO, null);
         final AtomicInteger count = new AtomicInteger(0);
         pipeRecord("1", "0", 0L);
         buffer.evictWhile(() -> buffer.numRecords() > 1, r -> 
count.getAndIncrement());
@@ -156,7 +160,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @Test
     public void shouldDropLateRecords() {
-        createBuffer(Duration.ZERO);
+        when(serdeGetter.keySerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        when(serdeGetter.valueSerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        createBuffer(Duration.ZERO, null);
         pipeRecord("1", "0", 1L);
         assertNumSizeAndTimestamp(buffer, 1, 1, 42);
         pipeRecord("2", "0", 0L);
@@ -165,7 +171,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @Test
     public void shouldDropLateRecordsWithNonZeroGrace() {
-        createBuffer(Duration.ofMillis(1));
+        createBuffer(Duration.ofMillis(1), Serdes.String());
         pipeRecord("1", "0", 2L);
         assertNumSizeAndTimestamp(buffer, 1, 2, 42);
         pipeRecord("2", "0", 1L);
@@ -176,7 +182,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
 
     @Test
     public void shouldHandleCollidingKeys() {
-        createBuffer(Duration.ofMillis(1));
+        when(serdeGetter.keySerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        when(serdeGetter.valueSerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        createBuffer(Duration.ofMillis(1), null);
         final AtomicInteger count = new AtomicInteger(0);
         pipeRecord("2", "0", 0L);
         buffer.evictWhile(() -> buffer.numRecords() > 0, r -> 
count.getAndIncrement());

Reply via email to