This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 54dc27c KAFKA-10515: Properly initialize nullable Serdes with default
values (#9467)
54dc27c is described below
commit 54dc27c81306228ecdfbaa6abcee9d730420ea7b
Author: Thorsten Hake <[email protected]>
AuthorDate: Thu Oct 29 23:37:36 2020 +0100
KAFKA-10515: Properly initialize nullable Serdes with default values (#9467)
This is a cherry pick of PR #9338 on branch 2.6.
Introduced the notion of WrappingNullableSerdes
(aligned to the concept of WrappingNullableSerializer and
WrappingNullableDeserializer) and centralized
initialization in WrappingNullables.
The added integeration test KTableKTableForeignKeyJoinDistributedTest tests
whether all serdes are now correctly set on all stream clients.
Reviewers: John Roesler <[email protected]>
---
.../kstream/internals/WrappingNullableSerde.java | 68 ++++++
.../kstream/internals/WrappingNullableUtils.java | 97 +++++++++
.../foreignkeyjoin/SubscriptionWrapperSerde.java | 26 +--
.../processor/internals/ProcessorContextUtils.java | 56 +++++
.../streams/processor/internals/SinkNode.java | 26 +--
.../streams/processor/internals/SourceNode.java | 25 +--
.../state/internals/MeteredKeyValueStore.java | 12 +-
.../state/internals/MeteredSessionStore.java | 6 +-
.../internals/MeteredTimestampedKeyValueStore.java | 15 +-
.../internals/MeteredTimestampedWindowStore.java | 14 +-
.../state/internals/MeteredWindowStore.java | 10 +-
.../internals/ValueAndTimestampDeserializer.java | 11 +-
.../state/internals/ValueAndTimestampSerde.java | 41 +---
.../internals/ValueAndTimestampSerializer.java | 14 +-
.../KTableKTableForeignKeyJoinDistributedTest.java | 235 +++++++++++++++++++++
.../internals/GlobalStateStoreProviderTest.java | 8 +
.../MeteredTimestampedKeyValueStoreTest.java | 6 +
17 files changed, 553 insertions(+), 117 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java
new file mode 100644
index 0000000..c15ff23
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.util.Map;
+import java.util.Objects;
+
+public abstract class WrappingNullableSerde<T, InnerK, InnerV> implements
Serde<T> {
+ private final WrappingNullableSerializer<T, InnerK, InnerV> serializer;
+ private final WrappingNullableDeserializer<T, InnerK, InnerV> deserializer;
+
+ protected WrappingNullableSerde(final WrappingNullableSerializer<T,
InnerK, InnerV> serializer,
+ final WrappingNullableDeserializer<T,
InnerK, InnerV> deserializer) {
+ Objects.requireNonNull(serializer, "serializer can't be null");
+ Objects.requireNonNull(deserializer, "deserializer can't be null");
+ this.serializer = serializer;
+ this.deserializer = deserializer;
+ }
+
+ @Override
+ public Serializer<T> serializer() {
+ return serializer;
+ }
+
+ @Override
+ public Deserializer<T> deserializer() {
+ return deserializer;
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs,
+ final boolean isKey) {
+ serializer.configure(configs, isKey);
+ deserializer.configure(configs, isKey);
+ }
+
+ @Override
+ public void close() {
+ serializer.close();
+ deserializer.close();
+ }
+
+ public void setIfUnset(final Serde<InnerK> defaultKeySerde, final
Serde<InnerV> defaultValueSerde) {
+ Objects.requireNonNull(defaultKeySerde);
+ Objects.requireNonNull(defaultValueSerde);
+ serializer.setIfUnset(defaultKeySerde.serializer(),
defaultValueSerde.serializer());
+ deserializer.setIfUnset(defaultKeySerde.deserializer(),
defaultValueSerde.deserializer());
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
new file mode 100644
index 0000000..23954d2
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * If a component's serdes are Wrapping serdes, then they require a little
extra setup
+ * to be fully initialized at run time.
+ */
+public class WrappingNullableUtils {
+
+ @SuppressWarnings("unchecked")
+ private static <T> Deserializer<T> prepareDeserializer(final
Deserializer<T> specificDeserializer, final Deserializer<?>
contextKeyDeserializer, final Deserializer<?> contextValueDeserializer, final
boolean isKey) {
+ Deserializer<T> deserializerToUse = specificDeserializer;
+ if (deserializerToUse == null) {
+ deserializerToUse = (Deserializer<T>) (isKey ?
contextKeyDeserializer : contextValueDeserializer);
+ } else {
+ initNullableDeserializer(deserializerToUse,
contextKeyDeserializer, contextValueDeserializer);
+ }
+ return deserializerToUse;
+ }
+ @SuppressWarnings("unchecked")
+ private static <T> Serializer<T> prepareSerializer(final Serializer<T>
specificSerializer, final Serializer<?> contextKeySerializer, final
Serializer<?> contextValueSerializer, final boolean isKey) {
+ Serializer<T> serializerToUse = specificSerializer;
+ if (serializerToUse == null) {
+ serializerToUse = (Serializer<T>) (isKey ? contextKeySerializer :
contextValueSerializer);
+ } else {
+ initNullableSerializer(serializerToUse, contextKeySerializer,
contextValueSerializer);
+ }
+ return serializerToUse;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde,
final Serde<?> contextKeySerde, final Serde<?> contextValueSerde, final boolean
isKey) {
+ Serde<T> serdeToUse = specificSerde;
+ if (serdeToUse == null) {
+ serdeToUse = (Serde<T>) (isKey ? contextKeySerde :
contextValueSerde);
+ } else if (serdeToUse instanceof WrappingNullableSerde) {
+ ((WrappingNullableSerde) serdeToUse).setIfUnset(contextKeySerde,
contextValueSerde);
+ }
+ return serdeToUse;
+ }
+
+ public static <K> Deserializer<K> prepareKeyDeserializer(final
Deserializer<K> specificDeserializer, final Deserializer<?>
contextKeyDeserializer, final Deserializer<?> contextValueDeserializer) {
+ return prepareDeserializer(specificDeserializer,
contextKeyDeserializer, contextValueDeserializer, true);
+ }
+
+ public static <V> Deserializer<V> prepareValueDeserializer(final
Deserializer<V> specificDeserializer, final Deserializer<?>
contextKeyDeserializer, final Deserializer<?> contextValueDeserializer) {
+ return prepareDeserializer(specificDeserializer,
contextKeyDeserializer, contextValueDeserializer, false);
+ }
+
+ public static <K> Serializer<K> prepareKeySerializer(final Serializer<K>
specificSerializer, final Serializer<?> contextKeySerializer, final
Serializer<?> contextValueSerializer) {
+ return prepareSerializer(specificSerializer, contextKeySerializer,
contextValueSerializer, true);
+ }
+
+ public static <V> Serializer<V> prepareValueSerializer(final Serializer<V>
specificSerializer, final Serializer<?> contextKeySerializer, final
Serializer<?> contextValueSerializer) {
+ return prepareSerializer(specificSerializer, contextKeySerializer,
contextValueSerializer, false);
+ }
+
+ public static <K> Serde<K> prepareKeySerde(final Serde<K> specificSerde,
final Serde<?> keySerde, final Serde<?> valueSerde) {
+ return prepareSerde(specificSerde, keySerde, valueSerde, true);
+ }
+
+ public static <V> Serde<V> prepareValueSerde(final Serde<V> specificSerde,
final Serde<?> keySerde, final Serde<?> valueSerde) {
+ return prepareSerde(specificSerde, keySerde, valueSerde, false);
+ }
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public static <T> void initNullableSerializer(final Serializer<T>
specificSerializer, final Serializer<?> contextKeySerializer, final
Serializer<?> contextValueSerializer) {
+ if (specificSerializer instanceof WrappingNullableSerializer) {
+ ((WrappingNullableSerializer)
specificSerializer).setIfUnset(contextKeySerializer, contextValueSerializer);
+ }
+ }
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public static <T> void initNullableDeserializer(final Deserializer<T>
specificDeserializer, final Deserializer<?> contextKeyDeserializer, final
Deserializer<?> contextValueDeserializer) {
+ if (specificDeserializer instanceof WrappingNullableDeserializer) {
+ ((WrappingNullableDeserializer)
specificDeserializer).setIfUnset(contextKeyDeserializer,
contextValueDeserializer);
+ }
+ }
+
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index d2cc989..7fe124a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -21,32 +21,22 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.function.Supplier;
-public class SubscriptionWrapperSerde<K> implements
Serde<SubscriptionWrapper<K>> {
- private final SubscriptionWrapperSerializer<K> serializer;
- private final SubscriptionWrapperDeserializer<K> deserializer;
-
+public class SubscriptionWrapperSerde<K> extends
WrappingNullableSerde<SubscriptionWrapper<K>, K, Void> {
public SubscriptionWrapperSerde(final Supplier<String>
primaryKeySerializationPseudoTopicSupplier,
final Serde<K> primaryKeySerde) {
- serializer = new
SubscriptionWrapperSerializer<>(primaryKeySerializationPseudoTopicSupplier,
- primaryKeySerde ==
null ? null : primaryKeySerde.serializer());
- deserializer = new
SubscriptionWrapperDeserializer<>(primaryKeySerializationPseudoTopicSupplier,
- primaryKeySerde
== null ? null : primaryKeySerde.deserializer());
- }
-
- @Override
- public Serializer<SubscriptionWrapper<K>> serializer() {
- return serializer;
- }
-
- @Override
- public Deserializer<SubscriptionWrapper<K>> deserializer() {
- return deserializer;
+ super(
+ new
SubscriptionWrapperSerializer<>(primaryKeySerializationPseudoTopicSupplier,
+ primaryKeySerde == null ? null
: primaryKeySerde.serializer()),
+ new
SubscriptionWrapperDeserializer<>(primaryKeySerializationPseudoTopicSupplier,
+ primaryKeySerde == null ?
null : primaryKeySerde.deserializer())
+ );
}
private static class SubscriptionWrapperSerializer<K>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
new file mode 100644
index 0000000..25721ec
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+
+/**
+ * This class bridges the gap for components that _should_ be compatible with
+ * the public ProcessorContext interface, but have come to depend on features
+ * in InternalProcessorContext. In theory, all the features adapted here could
+ * migrate to the public interface, so each method in this class should
reference
+ * the ticket that would ultimately obviate it.
+ */
+public final class ProcessorContextUtils {
+
+ private ProcessorContextUtils() {}
+
+ public static Serializer<?> getKeySerializer(final ProcessorContext
processorContext) {
+ return getSerializer(processorContext, true);
+ }
+ public static Serializer<?> getValueSerializer(final ProcessorContext
processorContext) {
+ return getSerializer(processorContext, false);
+ }
+ private static Serializer<?> getSerializer(final ProcessorContext
processorContext, final boolean key) {
+ final Serde<?> serde = key ? processorContext.keySerde() :
processorContext.valueSerde();
+ return serde == null ? null : serde.serializer();
+ }
+ public static Deserializer<?> getKeyDeserializer(final ProcessorContext
processorContext) {
+ return getDeserializer(processorContext, true);
+ }
+ public static Deserializer<?> getValueDeserializer(final ProcessorContext
processorContext) {
+ return getDeserializer(processorContext, false);
+ }
+ private static Deserializer<?> getDeserializer(final ProcessorContext
processorContext, final boolean key) {
+ final Serde<?> serde = key ? processorContext.keySerde() :
processorContext.valueSerde();
+ return serde == null ? null : serde.deserializer();
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 9b0a254..4efdfbb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -18,10 +18,12 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
+import static
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerializer;
+import static
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerializer;
+
public class SinkNode<K, V> extends ProcessorNode<K, V> {
private Serializer<K> keySerializer;
@@ -52,28 +54,14 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
throw new UnsupportedOperationException("sink node does not allow
addChild");
}
- @SuppressWarnings("unchecked")
@Override
public void init(final InternalProcessorContext context) {
super.init(context);
this.context = context;
-
- // if serializers are null, get the default ones from the context
- if (keySerializer == null) {
- keySerializer = (Serializer<K>) context.keySerde().serializer();
- }
- if (valSerializer == null) {
- valSerializer = (Serializer<V>) context.valueSerde().serializer();
- }
-
- // if serializers are internal wrapping serializers that may need to
be given the default serializer
- // then pass it the default one from the context
- if (valSerializer instanceof WrappingNullableSerializer) {
- ((WrappingNullableSerializer) valSerializer).setIfUnset(
- context.keySerde().serializer(),
- context.valueSerde().serializer()
- );
- }
+ final Serializer<?> contextKeySerializer =
ProcessorContextUtils.getKeySerializer(context);
+ final Serializer<?> contextValueSerializer =
ProcessorContextUtils.getValueSerializer(context);
+ keySerializer = prepareKeySerializer(keySerializer,
contextKeySerializer, contextValueSerializer);
+ valSerializer = prepareValueSerializer(valSerializer,
contextKeySerializer, contextValueSerializer);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 8508a7d..d49178a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -19,10 +19,12 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
import org.apache.kafka.streams.processor.TimestampExtractor;
import
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
+import static
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeyDeserializer;
+import static
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueDeserializer;
+
public class SourceNode<K, V> extends ProcessorNode<K, V> {
private InternalProcessorContext context;
@@ -55,7 +57,6 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
return valDeserializer.deserialize(topic, headers, data);
}
- @SuppressWarnings("unchecked")
@Override
public void init(final InternalProcessorContext context) {
// It is important to first create the sensor before calling init on
the
@@ -72,22 +73,10 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
super.init(context);
this.context = context;
- // if deserializers are null, get the default ones from the context
- if (this.keyDeserializer == null) {
- this.keyDeserializer = (Deserializer<K>)
context.keySerde().deserializer();
- }
- if (this.valDeserializer == null) {
- this.valDeserializer = (Deserializer<V>)
context.valueSerde().deserializer();
- }
-
- // if deserializers are internal wrapping deserializers that may need
to be given the default
- // then pass it the default one from the context
- if (valDeserializer instanceof WrappingNullableDeserializer) {
- ((WrappingNullableDeserializer) valDeserializer).setIfUnset(
- context.keySerde().deserializer(),
- context.valueSerde().deserializer()
- );
- }
+ final Deserializer<?> contextKeyDeserializer =
ProcessorContextUtils.getKeyDeserializer(context);
+ final Deserializer<?> contextValueDeserializer =
ProcessorContextUtils.getValueDeserializer(context);
+ keyDeserializer = prepareKeyDeserializer(keyDeserializer,
contextKeyDeserializer, contextValueDeserializer);
+ valDeserializer = prepareValueDeserializer(valDeserializer,
contextKeyDeserializer, contextValueDeserializer);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index c844e03..4608985 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -34,6 +35,7 @@ import
org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.ArrayList;
import java.util.List;
+import static
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
/**
@@ -101,12 +103,16 @@ public class MeteredKeyValueStore<K, V>
maybeMeasureLatency(() -> super.init(context, root), time,
restoreSensor);
}
- @SuppressWarnings("unchecked")
+ protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde,
final Serde<?> contextKeySerde, final Serde<?> contextValueSerde) {
+ return WrappingNullableUtils.prepareValueSerde(valueSerde,
contextKeySerde, contextValueSerde);
+ }
+
+ @Deprecated
void initStoreSerde(final ProcessorContext context) {
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name()),
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+ prepareKeySerde(keySerde, context.keySerde(), context.valueSerde()),
+ prepareValueSerdeForStore(valueSerde, context.keySerde(),
context.valueSerde()));
}
@SuppressWarnings("unchecked")
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index c7d4290..6421b9f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -65,14 +66,13 @@ public class MeteredSessionStore<K, V>
this.time = time;
}
- @SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context,
final StateStore root) {
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name()),
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+ WrappingNullableUtils.prepareKeySerde(keySerde,
context.keySerde(), context.valueSerde()),
+ WrappingNullableUtils.prepareValueSerde(valueSerde,
context.keySerde(), context.valueSerde()));
taskId = context.taskId().toString();
streamsMetrics = (StreamsMetricsImpl) context.metrics();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index d1446dc..042188e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -22,10 +22,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -50,11 +47,13 @@ public class MeteredTimestampedKeyValueStore<K, V>
}
@SuppressWarnings("unchecked")
- void initStoreSerde(final ProcessorContext context) {
- serdes = new StateSerdes<>(
- ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name()),
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>)
context.valueSerde()) : valueSerde);
+ @Override
+ protected Serde<ValueAndTimestamp<V>> prepareValueSerdeForStore(final
Serde<ValueAndTimestamp<V>> valueSerde, final Serde<?> contextKeySerde, final
Serde<?> contextValueSerde) {
+ if (valueSerde == null) {
+ return new ValueAndTimestampSerde<>((Serde<V>) contextValueSerde);
+ } else {
+ return super.prepareValueSerdeForStore(valueSerde,
contextKeySerde, contextValueSerde);
+ }
}
public RawAndDeserializedValue<V> getWithBinary(final K key) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
index aecf69f..cb68863 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
@@ -19,9 +19,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
@@ -49,10 +46,11 @@ class MeteredTimestampedWindowStore<K, V>
@SuppressWarnings("unchecked")
@Override
- void initStoreSerde(final ProcessorContext context) {
- serdes = new StateSerdes<>(
- ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name()),
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>)
context.valueSerde()) : valueSerde);
+ protected Serde<ValueAndTimestamp<V>> prepareValueSerde(final
Serde<ValueAndTimestamp<V>> valueSerde, final Serde<?> contextKeySerde, final
Serde<?> contextValueSerde) {
+ if (valueSerde == null) {
+ return new ValueAndTimestampSerde<>((Serde<V>) contextValueSerde);
+ } else {
+ return super.prepareValueSerde(valueSerde, contextKeySerde,
contextValueSerde);
+ }
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index fd39468..4bef5d0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -32,6 +33,7 @@ import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
+import static
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
public class MeteredWindowStore<K, V>
@@ -84,13 +86,15 @@ public class MeteredWindowStore<K, V>
// register and possibly restore the state from the logs
maybeMeasureLatency(() -> super.init(context, root), time,
restoreSensor);
}
+ protected Serde<V> prepareValueSerde(final Serde<V> valueSerde, final
Serde<?> contextKeySerde, final Serde<?> contextValueSerde) {
+ return WrappingNullableUtils.prepareValueSerde(valueSerde,
contextKeySerde, contextValueSerde);
+ }
- @SuppressWarnings("unchecked")
void initStoreSerde(final ProcessorContext context) {
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name()),
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+ prepareKeySerde(keySerde, context.keySerde(),
context.valueSerde()),
+ prepareValueSerde(valueSerde, context.keySerde(),
context.valueSerde()));
}
@SuppressWarnings("unchecked")
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
index 7cd37d2..b7e56a3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
@@ -18,13 +18,16 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
-class ValueAndTimestampDeserializer<V> implements
Deserializer<ValueAndTimestamp<V>> {
+import static
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+class ValueAndTimestampDeserializer<V> implements
WrappingNullableDeserializer<ValueAndTimestamp<V>, Void, V> {
private final static LongDeserializer LONG_DESERIALIZER = new
LongDeserializer();
public final Deserializer<V> valueDeserializer;
@@ -81,4 +84,10 @@ class ValueAndTimestampDeserializer<V> implements
Deserializer<ValueAndTimestamp
return LONG_DESERIALIZER.deserialize(null,
rawTimestamp(rawValueAndTimestamp));
}
+ @Override
+ public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer,
final Deserializer<V> defaultValueDeserializer) {
+ // ValueAndTimestampDeserializer never wraps a null deserializer (or
configure would throw),
+ // but it may wrap a deserializer that itself wraps a null
deserializer.
+ initNullableDeserializer(valueDeserializer, defaultKeyDeserializer,
defaultValueDeserializer);
+ }
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
index c02992f..1936d29 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
@@ -16,44 +16,17 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
import org.apache.kafka.streams.state.ValueAndTimestamp;
-import java.util.Map;
-import java.util.Objects;
-
-public class ValueAndTimestampSerde<V> implements Serde<ValueAndTimestamp<V>> {
- private final ValueAndTimestampSerializer<V> valueAndTimestampSerializer;
- private final ValueAndTimestampDeserializer<V>
valueAndTimestampDeserializer;
+import static java.util.Objects.requireNonNull;
+public class ValueAndTimestampSerde<V> extends
WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
public ValueAndTimestampSerde(final Serde<V> valueSerde) {
- Objects.requireNonNull(valueSerde);
- valueAndTimestampSerializer = new
ValueAndTimestampSerializer<>(valueSerde.serializer());
- valueAndTimestampDeserializer = new
ValueAndTimestampDeserializer<>(valueSerde.deserializer());
- }
-
- @Override
- public void configure(final Map<String, ?> configs,
- final boolean isKey) {
- valueAndTimestampSerializer.configure(configs, isKey);
- valueAndTimestampDeserializer.configure(configs, isKey);
- }
-
- @Override
- public void close() {
- valueAndTimestampSerializer.close();
- valueAndTimestampDeserializer.close();
- }
-
- @Override
- public Serializer<ValueAndTimestamp<V>> serializer() {
- return valueAndTimestampSerializer;
- }
-
- @Override
- public Deserializer<ValueAndTimestamp<V>> deserializer() {
- return valueAndTimestampDeserializer;
+ super(
+ new ValueAndTimestampSerializer<>(requireNonNull(valueSerde,
"valueSerde was null").serializer()),
+ new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde,
"valueSerde was null").deserializer())
+ );
}
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
index 3b2663d..58c6159 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
@@ -18,13 +18,16 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
-public class ValueAndTimestampSerializer<V> implements
Serializer<ValueAndTimestamp<V>> {
+import static
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+public class ValueAndTimestampSerializer<V> implements
WrappingNullableSerializer<ValueAndTimestamp<V>, Void, V> {
public final Serializer<V> valueSerializer;
private final Serializer<Long> timestampSerializer;
@@ -54,7 +57,7 @@ public class ValueAndTimestampSerializer<V> implements
Serializer<ValueAndTimest
/**
* @param left the serialized byte array of the old record in state store
* @param right the serialized byte array of the new record being processed
- * @return true if the two serialized values are the same (excluding
timestamp) or
+ * @return true if the two serialized values are the same (excluding
timestamp) or
* if the timestamp of right is less than left (indicating
out of order record)
* false otherwise
*/
@@ -125,4 +128,11 @@ public class ValueAndTimestampSerializer<V> implements
Serializer<ValueAndTimest
valueSerializer.close();
timestampSerializer.close();
}
+
+ @Override
+ public void setIfUnset(final Serializer<Void> defaultKeySerializer, final
Serializer<V> defaultValueSerializer) {
+ // ValueAndTimestampSerializer never wraps a null serializer (or
configure would throw),
+ // but it may wrap a serializer that itself wraps a null serializer.
+ initNullableSerializer(valueSerializer, defaultKeySerializer,
defaultValueSerializer);
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
new file mode 100644
index 0000000..de71360
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.junit.Assert.assertEquals;
+
+@Category({IntegrationTest.class})
+public class KTableKTableForeignKeyJoinDistributedTest {
+ private static final int NUM_BROKERS = 1;
+ private static final String LEFT_TABLE = "left_table";
+ private static final String RIGHT_TABLE = "right_table";
+ private static final String OUTPUT = "output-topic";
+ @ClassRule
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
+ private static final Properties CONSUMER_CONFIG = new Properties();
+
+ @Rule
+ public TestName testName = new TestName();
+
+
+ private static final String INPUT_TOPIC = "input-topic";
+
+ private KafkaStreams client1;
+ private KafkaStreams client2;
+
+ private volatile boolean client1IsOk = false;
+ private volatile boolean client2IsOk = false;
+
+ @BeforeClass
+ public static void createTopics() throws InterruptedException {
+ CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+ }
+
+ @Before
+ public void setupTopics() throws InterruptedException {
+ CLUSTER.createTopic(LEFT_TABLE, 1, 1);
+ CLUSTER.createTopic(RIGHT_TABLE, 1, 1);
+ CLUSTER.createTopic(OUTPUT, 11, 1);
+
+ //Fill test tables
+ final Properties producerConfig = new Properties();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
+ final List<KeyValue<String, String>> leftTable = Arrays.asList(
+ new KeyValue<>("lhsValue1", "lhsValue1|rhs1"),
+ new KeyValue<>("lhsValue2", "lhsValue2|rhs2"),
+ new KeyValue<>("lhsValue3", "lhsValue3|rhs3"),
+ new KeyValue<>("lhsValue4", "lhsValue4|rhs4")
+ );
+ final List<KeyValue<String, String>> rightTable = Arrays.asList(
+ new KeyValue<>("rhs1", "rhsValue1"),
+ new KeyValue<>("rhs2", "rhsValue2"),
+ new KeyValue<>("rhs3", "rhsValue3")
+ );
+
+ IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE,
leftTable, producerConfig, CLUSTER.time);
+ IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE,
rightTable, producerConfig, CLUSTER.time);
+
+ CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG,
"ktable-ktable-distributed-consumer");
+ CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ }
+
+ @After
+ public void after() {
+ client1.close();
+ client2.close();
+ quietlyCleanStateAfterTest(CLUSTER, client1);
+ quietlyCleanStateAfterTest(CLUSTER, client2);
+ }
+
+ public Properties getStreamsConfiguration() {
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
safeTestName);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION,
StreamsConfig.OPTIMIZE);
+ return streamsConfiguration;
+ }
+
+
+ private void configureBuilder(final StreamsBuilder builder) {
+ final KTable<String, String> left = builder.table(
+ LEFT_TABLE
+ );
+ final KTable<String, String> right = builder.table(
+ RIGHT_TABLE
+ );
+
+ final Function<String, String> extractor = value ->
value.split("\\|")[1];
+ final ValueJoiner<String, String, String> joiner = (value1, value2) ->
"(" + value1 + "," + value2 + ")";
+
+ final KTable<String, String> fkJoin = left.join(right, extractor,
joiner);
+ fkJoin
+ .toStream()
+ .to(OUTPUT);
+ }
+
+ @Test
+ public void shouldBeInitializedWithDefaultSerde() throws Exception {
+ final Properties streamsConfiguration1 = getStreamsConfiguration();
+ final Properties streamsConfiguration2 = getStreamsConfiguration();
+
+ //Each streams client needs to have it's own StreamsBuilder in order
to simulate
+ //a truly distributed run
+ final StreamsBuilder builder1 = new StreamsBuilder();
+ configureBuilder(builder1);
+ final StreamsBuilder builder2 = new StreamsBuilder();
+ configureBuilder(builder2);
+
+
+ createClients(
+ builder1.build(streamsConfiguration1),
+ streamsConfiguration1,
+ builder2.build(streamsConfiguration2),
+ streamsConfiguration2
+ );
+
+ setStateListenersForVerification(thread ->
!thread.activeTasks().isEmpty());
+
+ startClients();
+
+ waitUntilBothClientAreOK(
+ "At least one client did not reach state RUNNING with active
tasks"
+ );
+ final Set<KeyValue<String, String>> expectedResult = new HashSet<>();
+ expectedResult.add(new KeyValue<>("lhsValue1",
"(lhsValue1|rhs1,rhsValue1)"));
+ expectedResult.add(new KeyValue<>("lhsValue2",
"(lhsValue2|rhs2,rhsValue2)"));
+ expectedResult.add(new KeyValue<>("lhsValue3",
"(lhsValue3|rhs3,rhsValue3)"));
+ final Set<KeyValue<String, String>> result = new
HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ CONSUMER_CONFIG,
+ OUTPUT,
+ expectedResult.size()));
+
+ assertEquals(expectedResult, result);
+ //Check that both clients are still running
+ assertEquals(KafkaStreams.State.RUNNING, client1.state());
+ assertEquals(KafkaStreams.State.RUNNING, client2.state());
+ }
+
+ private void createClients(final Topology topology1,
+ final Properties streamsConfiguration1,
+ final Topology topology2,
+ final Properties streamsConfiguration2) {
+
+ client1 = new KafkaStreams(topology1, streamsConfiguration1);
+ client2 = new KafkaStreams(topology2, streamsConfiguration2);
+ }
+
+ private void setStateListenersForVerification(final
Predicate<ThreadMetadata> taskCondition) {
+ client1.setStateListener((newState, oldState) -> {
+ if (newState == KafkaStreams.State.RUNNING &&
+
client1.localThreadsMetadata().stream().allMatch(taskCondition)) {
+ client1IsOk = true;
+ }
+ });
+ client2.setStateListener((newState, oldState) -> {
+ if (newState == KafkaStreams.State.RUNNING &&
+
client2.localThreadsMetadata().stream().allMatch(taskCondition)) {
+ client2IsOk = true;
+ }
+ });
+ }
+
+ private void startClients() {
+ client1.start();
+ client2.start();
+ }
+
+ private void waitUntilBothClientAreOK(final String message) throws
Exception {
+ TestUtils.waitForCondition(() -> client1IsOk && client2IsOk,
+ 30 * 1000,
+ message + ": "
+ + "Client 1 is " + (!client1IsOk ? "NOT " : "") + "OK,
"
+ + "client 2 is " + (!client2IsOk ? "NOT " : "") + "OK."
+ );
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 1ab8684..024dd44 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -96,12 +97,19 @@ public class GlobalStateStoreProviderTest {
.anyTimes();
expect(mockContext.taskId()).andReturn(new TaskId(0, 0)).anyTimes();
expect(mockContext.recordCollector()).andReturn(null).anyTimes();
+ expectSerdes(mockContext);
replay(mockContext);
for (final StateStore store : stores.values()) {
store.init(mockContext, null);
}
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private static void expectSerdes(final ProcessorContextImpl context) {
+ expect(context.keySerde()).andReturn((Serde)
Serdes.String()).anyTimes();
+ expect(context.valueSerde()).andReturn((Serde)
Serdes.Long()).anyTimes();
+ }
+
@Test
public void shouldReturnSingleItemListIfStoreExists() {
final GlobalStateStoreProvider provider =
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 5522855..44b215d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -128,6 +128,7 @@ public class MeteredTimestampedKeyValueStoreTest {
expect(context.metrics())
.andReturn(new StreamsMetricsImpl(metrics, "test",
builtInMetricsVersion)).anyTimes();
expect(context.taskId()).andReturn(taskId).anyTimes();
+ expectSerdes(context);
expect(inner.name()).andReturn("metered").anyTimes();
storeLevelGroup =
StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion) ?
STORE_LEVEL_GROUP_FROM_0100_TO_24 : STORE_LEVEL_GROUP;
@@ -140,6 +141,11 @@ public class MeteredTimestampedKeyValueStoreTest {
);
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private static void expectSerdes(final InternalProcessorContext context) {
+ expect(context.keySerde()).andReturn((Serde)
Serdes.String()).anyTimes();
+ expect(context.valueSerde()).andReturn((Serde)
Serdes.Long()).anyTimes();
+ }
private void init() {
replay(inner, context);