This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 1bdf98ec157 KAFKA-13722: Refactor SerdeGetter (#18242)
1bdf98ec157 is described below
commit 1bdf98ec15743b4fc149cad754b94dddc65323a2
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Dec 18 03:01:46 2024 -0800
KAFKA-13722: Refactor SerdeGetter (#18242)
Reviewers: Bruno Cadonna <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../streams/processor/internals/SerdeGetter.java | 39 ++++++++++------------
.../RocksDBTimeOrderedKeyValueBuffer.java | 4 +--
.../state/internals/StoreSerdeInitializer.java | 15 ---------
.../RocksDBTimeOrderedKeyValueBufferTest.java | 6 ++--
.../state/internals/StoreSerdeInitializerTest.java | 16 ++++-----
5 files changed, 31 insertions(+), 49 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SerdeGetter.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SerdeGetter.java
index 72bfc99804b..74665bc3877 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SerdeGetter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SerdeGetter.java
@@ -18,37 +18,34 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+
+import java.util.function.Supplier;
/**
* Allows serde access across different context types.
*/
public class SerdeGetter {
- private final org.apache.kafka.streams.processor.ProcessorContext
oldProcessorContext;
- private final org.apache.kafka.streams.processor.api.ProcessorContext
newProcessorContext;
- private final StateStoreContext stateStorecontext;
- public SerdeGetter(final
org.apache.kafka.streams.processor.ProcessorContext context) {
- oldProcessorContext = context;
- newProcessorContext = null;
- stateStorecontext = null;
- }
- public SerdeGetter(final
org.apache.kafka.streams.processor.api.ProcessorContext context) {
- oldProcessorContext = null;
- newProcessorContext = context;
- stateStorecontext = null;
+ private final Supplier<Serde<?>> keySerdeSupplier;
+ private final Supplier<Serde<?>> valueSerdeSupplier;
+
+ public SerdeGetter(final ProcessorContext<?, ?> context) {
+ keySerdeSupplier = context::keySerde;
+ valueSerdeSupplier = context::valueSerde;
}
+
public SerdeGetter(final StateStoreContext context) {
- oldProcessorContext = null;
- newProcessorContext = null;
- stateStorecontext = context;
+ keySerdeSupplier = context::keySerde;
+ valueSerdeSupplier = context::valueSerde;
}
- public Serde keySerde() {
- return oldProcessorContext != null ? oldProcessorContext.keySerde() :
- newProcessorContext != null ? newProcessorContext.keySerde() :
stateStorecontext.keySerde();
+
+ public Serde<?> keySerde() {
+ return keySerdeSupplier.get();
}
- public Serde valueSerde() {
- return oldProcessorContext != null ? oldProcessorContext.valueSerde() :
- newProcessorContext != null ? newProcessorContext.valueSerde() :
stateStorecontext.valueSerde();
+
+ public Serde<?> valueSerde() {
+ return valueSerdeSupplier.get();
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
index 306d6bf9cfb..001f866e963 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
@@ -58,7 +58,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V>
implements TimeOrderedKeyVal
private final boolean loggingEnabled;
private int partition;
private String changelogTopic;
- private InternalProcessorContext context;
+ private InternalProcessorContext<?, ?> context;
private boolean minValid;
public static class Builder<K, V> implements
StoreBuilder<TimeOrderedKeyValueBuffer<K, V, V>> {
@@ -156,7 +156,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V>
implements TimeOrderedKeyVal
@Override
public void setSerdesIfNull(final SerdeGetter getter) {
keySerde = keySerde == null ? (Serde<K>) getter.keySerde() : keySerde;
- valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde;
+ valueSerde = valueSerde == null ? (Serde<V>) getter.valueSerde() :
valueSerde;
}
private long observedStreamTime() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java
index cf44ca19bb1..813024fe7c1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java
@@ -20,13 +20,11 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.state.StateSerdes;
-
public class StoreSerdeInitializer {
static <K, V> StateSerdes<K, V> prepareStoreSerde(final StateStoreContext
context,
final String storeName,
@@ -41,19 +39,6 @@ public class StoreSerdeInitializer {
);
}
- static <K, V> StateSerdes<K, V> prepareStoreSerde(final ProcessorContext
context,
- final String storeName,
- final String
changelogTopic,
- final Serde<K> keySerde,
- final Serde<V>
valueSerde,
- final PrepareFunc<V>
prepareValueSerdeFunc) {
- return new StateSerdes<>(
- changelogTopic,
- prepareSerde(WrappingNullableUtils::prepareKeySerde, storeName,
keySerde, new SerdeGetter(context), true, context.taskId()),
- prepareSerde(prepareValueSerdeFunc, storeName, valueSerde, new
SerdeGetter(context), false, context.taskId())
- );
- }
-
private static <T> Serde<T> prepareSerde(final PrepareFunc<T> prepare,
final String storeName,
final Serde<T> serde,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
index 8fbde2f78e1..c1ff46f873e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.StateStoreContext;
@@ -59,10 +60,11 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
public Sensor sensor;
public long offset;
+ @SuppressWarnings({"rawtypes", "unchecked"})
@BeforeEach
public void setUp() {
- when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde());
- when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
+ when(serdeGetter.keySerde()).thenReturn((Serde) new
Serdes.StringSerde());
+ when(serdeGetter.valueSerde()).thenReturn((Serde) new
Serdes.StringSerde());
final Metrics metrics = new Metrics();
offset = 0;
streamsMetrics = new StreamsMetricsImpl(metrics, "test-client",
"processId", new MockTime());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java
index 2891825ba52..eb85cde807c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java
@@ -21,8 +21,6 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.MockInternalNewProcessorContext;
@@ -62,7 +60,7 @@ public class StoreSerdeInitializerTest {
utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(),
any())).thenReturn(valueSerde);
final StateSerdes<String, String> result =
StoreSerdeInitializer.prepareStoreSerde(
- (ProcessorContext) context, "myStore", "topic", keySerde,
valueSerde, WrappingNullableUtils::prepareValueSerde);
+ context, "myStore", "topic", keySerde, valueSerde,
WrappingNullableUtils::prepareValueSerde);
assertThat(result.keySerde(), equalTo(keySerde));
assertThat(result.valueSerde(), equalTo(valueSerde));
@@ -77,7 +75,7 @@ public class StoreSerdeInitializerTest {
.thenThrow(new ConfigException("Please set
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG"));
final Throwable exception = assertThrows(StreamsException.class,
- () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext)
context, "myStore", "topic",
+ () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore",
"topic",
new Serdes.StringSerde(), new Serdes.StringSerde(),
WrappingNullableUtils::prepareValueSerde));
assertThat(exception.getMessage(), equalTo("Failed to initialize key
serdes for store myStore"));
@@ -92,7 +90,7 @@ public class StoreSerdeInitializerTest {
.thenThrow(new ConfigException("Please set
StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG"));
final Throwable exception = assertThrows(StreamsException.class,
- () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext)
context, "myStore", "topic",
+ () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore",
"topic",
new Serdes.StringSerde(), new Serdes.StringSerde(),
WrappingNullableUtils::prepareValueSerde));
assertThat(exception.getMessage(), equalTo("Failed to initialize value
serdes for store myStore"));
@@ -107,7 +105,7 @@ public class StoreSerdeInitializerTest {
.thenThrow(new ConfigException("Please set
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG"));
final Throwable exception = assertThrows(StreamsException.class,
- () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext)
context, "myStore", "topic",
+ () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore",
"topic",
new Serdes.StringSerde(), new Serdes.StringSerde(),
WrappingNullableUtils::prepareValueSerde));
assertThat(exception.getMessage(), equalTo("Failed to initialize key
serdes for store myStore"));
@@ -122,7 +120,7 @@ public class StoreSerdeInitializerTest {
.thenThrow(new ConfigException("Please set
StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG"));
final Throwable exception = assertThrows(StreamsException.class,
- () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext)
context, "myStore", "topic",
+ () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore",
"topic",
new Serdes.StringSerde(), new Serdes.StringSerde(),
WrappingNullableUtils::prepareValueSerde));
assertThat(exception.getMessage(), equalTo("Failed to initialize value
serdes for store myStore"));
@@ -136,7 +134,7 @@ public class StoreSerdeInitializerTest {
utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(),
any())).thenThrow(new StreamsException(""));
final Throwable exception = assertThrows(StreamsException.class,
- () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext)
context, "myStore", "topic",
+ () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore",
"topic",
new Serdes.StringSerde(), new Serdes.StringSerde(),
WrappingNullableUtils::prepareValueSerde));
assertThat(exception.getMessage(), equalTo("Failed to initialize key
serdes for store myStore"));
@@ -149,7 +147,7 @@ public class StoreSerdeInitializerTest {
utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(),
any())).thenThrow(new StreamsException(""));
final Throwable exception = assertThrows(StreamsException.class,
- () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext)
context, "myStore", "topic",
+ () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore",
"topic",
new Serdes.StringSerde(), new Serdes.StringSerde(),
WrappingNullableUtils::prepareValueSerde));
assertThat(exception.getMessage(), equalTo("Failed to initialize value
serdes for store myStore"));