This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 1bdf98ec157 KAFKA-13722: Refactor SerdeGetter (#18242)
1bdf98ec157 is described below

commit 1bdf98ec15743b4fc149cad754b94dddc65323a2
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Dec 18 03:01:46 2024 -0800

    KAFKA-13722: Refactor SerdeGetter (#18242)
    
    Reviewers: Bruno Cadonna <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../streams/processor/internals/SerdeGetter.java   | 39 ++++++++++------------
 .../RocksDBTimeOrderedKeyValueBuffer.java          |  4 +--
 .../state/internals/StoreSerdeInitializer.java     | 15 ---------
 .../RocksDBTimeOrderedKeyValueBufferTest.java      |  6 ++--
 .../state/internals/StoreSerdeInitializerTest.java | 16 ++++-----
 5 files changed, 31 insertions(+), 49 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SerdeGetter.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SerdeGetter.java
index 72bfc99804b..74665bc3877 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SerdeGetter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SerdeGetter.java
@@ -18,37 +18,34 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+
+import java.util.function.Supplier;
 
 /**
  * Allows serde access across different context types.
  */
 public class SerdeGetter {
 
-    private final org.apache.kafka.streams.processor.ProcessorContext 
oldProcessorContext;
-    private final org.apache.kafka.streams.processor.api.ProcessorContext 
newProcessorContext;
-    private final StateStoreContext stateStorecontext;
-    public SerdeGetter(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
-        oldProcessorContext = context;
-        newProcessorContext = null;
-        stateStorecontext = null;
-    }
-    public SerdeGetter(final 
org.apache.kafka.streams.processor.api.ProcessorContext context) {
-        oldProcessorContext = null;
-        newProcessorContext = context;
-        stateStorecontext = null;
+    private final Supplier<Serde<?>> keySerdeSupplier;
+    private final Supplier<Serde<?>> valueSerdeSupplier;
+
+    public SerdeGetter(final ProcessorContext<?, ?> context) {
+        keySerdeSupplier = context::keySerde;
+        valueSerdeSupplier = context::valueSerde;
     }
+
     public SerdeGetter(final StateStoreContext context) {
-        oldProcessorContext = null;
-        newProcessorContext = null;
-        stateStorecontext = context;
+        keySerdeSupplier = context::keySerde;
+        valueSerdeSupplier = context::valueSerde;
     }
-    public Serde keySerde() {
-        return oldProcessorContext != null ? oldProcessorContext.keySerde() :
-            newProcessorContext != null ? newProcessorContext.keySerde() : 
stateStorecontext.keySerde();
+
+    public Serde<?> keySerde() {
+        return keySerdeSupplier.get();
     }
-    public Serde valueSerde() {
-        return oldProcessorContext != null ? oldProcessorContext.valueSerde() :
-            newProcessorContext != null ? newProcessorContext.valueSerde() : 
stateStorecontext.valueSerde();
+
+    public Serde<?> valueSerde() {
+        return valueSerdeSupplier.get();
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
index 306d6bf9cfb..001f866e963 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
@@ -58,7 +58,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrderedKeyVal
     private final boolean loggingEnabled;
     private int partition;
     private String changelogTopic;
-    private InternalProcessorContext context;
+    private InternalProcessorContext<?, ?> context;
     private boolean minValid;
 
     public static class Builder<K, V> implements 
StoreBuilder<TimeOrderedKeyValueBuffer<K, V, V>> {
@@ -156,7 +156,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrderedKeyVal
     @Override
     public void setSerdesIfNull(final SerdeGetter getter) {
         keySerde = keySerde == null ? (Serde<K>) getter.keySerde() : keySerde;
-        valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde;
+        valueSerde = valueSerde == null ? (Serde<V>) getter.valueSerde() : 
valueSerde;
     }
 
     private long observedStreamTime() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java
index cf44ca19bb1..813024fe7c1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java
@@ -20,13 +20,11 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.state.StateSerdes;
 
-
 public class StoreSerdeInitializer {
     static <K, V> StateSerdes<K, V> prepareStoreSerde(final StateStoreContext 
context,
                                                       final String storeName,
@@ -41,19 +39,6 @@ public class StoreSerdeInitializer {
         );
     }
 
-    static <K, V> StateSerdes<K, V> prepareStoreSerde(final ProcessorContext 
context,
-                                                      final String storeName,
-                                                      final String 
changelogTopic,
-                                                      final Serde<K> keySerde,
-                                                      final Serde<V> 
valueSerde,
-                                                      final PrepareFunc<V> 
prepareValueSerdeFunc) {
-        return new StateSerdes<>(
-            changelogTopic,
-            prepareSerde(WrappingNullableUtils::prepareKeySerde, storeName, 
keySerde, new SerdeGetter(context), true, context.taskId()),
-            prepareSerde(prepareValueSerdeFunc, storeName, valueSerde, new 
SerdeGetter(context), false, context.taskId())
-        );
-    }
-
     private static <T> Serde<T> prepareSerde(final PrepareFunc<T> prepare,
                                              final String storeName,
                                              final Serde<T> serde,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
index 8fbde2f78e1..c1ff46f873e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.processor.StateStoreContext;
@@ -59,10 +60,11 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
     public Sensor sensor;
     public long offset;
 
+    @SuppressWarnings({"rawtypes", "unchecked"})
     @BeforeEach
     public void setUp() {
-        when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde());
-        when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
+        when(serdeGetter.keySerde()).thenReturn((Serde) new 
Serdes.StringSerde());
+        when(serdeGetter.valueSerde()).thenReturn((Serde) new 
Serdes.StringSerde());
         final Metrics metrics = new Metrics();
         offset = 0;
         streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", 
"processId", new MockTime());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java
index 2891825ba52..eb85cde807c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java
@@ -21,8 +21,6 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.MockInternalNewProcessorContext;
 
@@ -62,7 +60,7 @@ public class StoreSerdeInitializerTest {
         utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), 
any())).thenReturn(valueSerde);
 
         final StateSerdes<String, String> result = 
StoreSerdeInitializer.prepareStoreSerde(
-            (ProcessorContext) context, "myStore", "topic", keySerde, 
valueSerde, WrappingNullableUtils::prepareValueSerde);
+            context, "myStore", "topic", keySerde, valueSerde, 
WrappingNullableUtils::prepareValueSerde);
 
         assertThat(result.keySerde(), equalTo(keySerde));
         assertThat(result.valueSerde(), equalTo(valueSerde));
@@ -77,7 +75,7 @@ public class StoreSerdeInitializerTest {
             .thenThrow(new ConfigException("Please set 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG"));
 
         final Throwable exception = assertThrows(StreamsException.class,
-            () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext) 
context, "myStore", "topic",
+            () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore", 
"topic",
                 new Serdes.StringSerde(), new Serdes.StringSerde(), 
WrappingNullableUtils::prepareValueSerde));
 
         assertThat(exception.getMessage(), equalTo("Failed to initialize key 
serdes for store myStore"));
@@ -92,7 +90,7 @@ public class StoreSerdeInitializerTest {
             .thenThrow(new ConfigException("Please set 
StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG"));
 
         final Throwable exception = assertThrows(StreamsException.class,
-            () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext) 
context, "myStore", "topic",
+            () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore", 
"topic",
                 new Serdes.StringSerde(), new Serdes.StringSerde(), 
WrappingNullableUtils::prepareValueSerde));
 
         assertThat(exception.getMessage(), equalTo("Failed to initialize value 
serdes for store myStore"));
@@ -107,7 +105,7 @@ public class StoreSerdeInitializerTest {
             .thenThrow(new ConfigException("Please set 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG"));
 
         final Throwable exception = assertThrows(StreamsException.class,
-            () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext) 
context, "myStore", "topic",
+            () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore", 
"topic",
                 new Serdes.StringSerde(), new Serdes.StringSerde(), 
WrappingNullableUtils::prepareValueSerde));
 
         assertThat(exception.getMessage(), equalTo("Failed to initialize key 
serdes for store myStore"));
@@ -122,7 +120,7 @@ public class StoreSerdeInitializerTest {
             .thenThrow(new ConfigException("Please set 
StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG"));
 
         final Throwable exception = assertThrows(StreamsException.class,
-            () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext) 
context, "myStore", "topic",
+            () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore", 
"topic",
                 new Serdes.StringSerde(), new Serdes.StringSerde(), 
WrappingNullableUtils::prepareValueSerde));
 
         assertThat(exception.getMessage(), equalTo("Failed to initialize value 
serdes for store myStore"));
@@ -136,7 +134,7 @@ public class StoreSerdeInitializerTest {
         utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), 
any())).thenThrow(new StreamsException(""));
 
         final Throwable exception = assertThrows(StreamsException.class,
-            () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext) 
context, "myStore", "topic",
+            () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore", 
"topic",
                 new Serdes.StringSerde(), new Serdes.StringSerde(), 
WrappingNullableUtils::prepareValueSerde));
 
         assertThat(exception.getMessage(), equalTo("Failed to initialize key 
serdes for store myStore"));
@@ -149,7 +147,7 @@ public class StoreSerdeInitializerTest {
         utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), 
any())).thenThrow(new StreamsException(""));
 
         final Throwable exception = assertThrows(StreamsException.class,
-            () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext) 
context, "myStore", "topic",
+            () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore", 
"topic",
                 new Serdes.StringSerde(), new Serdes.StringSerde(), 
WrappingNullableUtils::prepareValueSerde));
 
         assertThat(exception.getMessage(), equalTo("Failed to initialize value 
serdes for store myStore"));

Reply via email to