This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c5f8ae0424a KAFKA-16260: Deprecate window.size.ms and
window.inner.class.serde in StreamsConfig (#18297)
c5f8ae0424a is described below
commit c5f8ae0424ad189cca87126e55a4e17c676578d2
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Mar 31 20:15:37 2025 +0800
KAFKA-16260: Deprecate window.size.ms and window.inner.class.serde in
StreamsConfig (#18297)
The `window.size.ms` and `window.inner.class.serde` are not a true
KafkaStreams config, and are ignored when set from a KStreams
application. Both belong on the client.
Reviewers: Lucas Brutschy <[email protected]>
Signed-off-by: PoAn Yang <[email protected]>
---
docs/upgrade.html | 8 ++
.../KStreamAggregationIntegrationTest.java | 20 ++--
.../SelfJoinUpgradeIntegrationTest.java | 3 +-
.../SlidingWindowedKStreamIntegrationTest.java | 9 +-
.../TimeWindowedKStreamIntegrationTest.java | 9 +-
.../org/apache/kafka/streams/StreamsConfig.java | 23 ++++-
.../kstream/SessionWindowedDeserializer.java | 49 ++++++---
.../streams/kstream/SessionWindowedSerializer.java | 48 ++++++---
.../streams/kstream/TimeWindowedDeserializer.java | 115 +++++++++++++--------
.../streams/kstream/TimeWindowedSerializer.java | 48 ++++++---
.../kstream/SessionWindowedDeserializerTest.java | 37 ++++++-
.../kstream/SessionWindowedSerializerTest.java | 37 ++++++-
.../kstream/TimeWindowedDeserializerTest.java | 71 +++++++++++--
.../kstream/TimeWindowedSerializerTest.java | 36 ++++++-
14 files changed, 394 insertions(+), 119 deletions(-)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 88be13c531e..3ffa47f0b02 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -36,6 +36,14 @@
</li>
</ul>
</li>
+ <li><b>Stream</b>
+ <ul>
+ <li>
+ The <code>window.size.ms</code> and
<code>window.inner.serde.class</code> in <code>StreamsConfig</code> are
deprecated.
+ Use the corresponding string constants defined in
<code>TimeWindowedSerializer</code>, <code>TimeWindowedDeserializer</code>,
<code>SessionWindowedSerializer</code> and
<code>SessionWindowedDeserializer</code> instead.
+ </li>
+ </ul>
+ </li>
</ul>
<h4><a id="upgrade_4_0_0" href="#upgrade_4_0_0">Upgrading to 4.0.0</a></h4>
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index bf0d54bc5c0..7037ce1368d 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -1065,9 +1065,12 @@ public class KStreamAggregationIntegrationTest {
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializer.getClass().getName());
- consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
- if (keyDeserializer instanceof TimeWindowedDeserializer ||
keyDeserializer instanceof SessionWindowedDeserializer) {
-
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
+ consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG,
500L);
+ if (keyDeserializer instanceof TimeWindowedDeserializer) {
+
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
+ Serdes.serdeFrom(innerClass).getClass().getName());
+ } else if (keyDeserializer instanceof SessionWindowedDeserializer) {
+
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
}
return
IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
@@ -1089,9 +1092,12 @@ public class KStreamAggregationIntegrationTest {
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializer.getClass().getName());
- consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
- if (keyDeserializer instanceof TimeWindowedDeserializer ||
keyDeserializer instanceof SessionWindowedDeserializer) {
-
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
+ consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG,
500L);
+ if (keyDeserializer instanceof TimeWindowedDeserializer) {
+
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
+ Serdes.serdeFrom(innerClass).getClass().getName());
+ } else if (keyDeserializer instanceof SessionWindowedDeserializer) {
+
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
}
return
IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
@@ -1123,7 +1129,7 @@ public class KStreamAggregationIntegrationTest {
"--property", "key.deserializer=" +
keyDeserializer.getClass().getName(),
"--property", "value.deserializer=" +
valueDeserializer.getClass().getName(),
"--property", "key.separator=" + keySeparator,
- "--property", "key.deserializer." +
StreamsConfig.WINDOWED_INNER_CLASS_SERDE + "=" +
Serdes.serdeFrom(innerClass).getClass().getName(),
+ "--property", "key.deserializer." +
TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS + "=" +
Serdes.serdeFrom(innerClass).getClass().getName(),
"--property", "key.deserializer.window.size.ms=500",
};
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
index d97d85a6af3..c9b498d4054 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.TestUtils;
@@ -262,7 +263,7 @@ public class SelfJoinUpgradeIntegrationTest {
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
- consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
+ consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG,
500L);
final List<KeyValueTimestamp<K, V>> actual =
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
index 5f972f7c4f4..2954fa9806d 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
@@ -444,9 +444,12 @@ public class SlidingWindowedKStreamIntegrationTest {
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializer.getClass().getName());
- consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG,
windowSize);
- if (keyDeserializer instanceof TimeWindowedDeserializer ||
keyDeserializer instanceof SessionWindowedDeserializer) {
-
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
+ consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG,
windowSize);
+ if (keyDeserializer instanceof TimeWindowedDeserializer) {
+
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
+ Serdes.serdeFrom(innerClass).getClass().getName());
+ } else if (keyDeserializer instanceof SessionWindowedDeserializer) {
+
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
}
return
IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
index 14a07e569ee..53746cd7fd6 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
@@ -478,9 +478,12 @@ public class TimeWindowedKStreamIntegrationTest {
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializer.getClass().getName());
- consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG,
windowSize);
- if (keyDeserializer instanceof TimeWindowedDeserializer ||
keyDeserializer instanceof SessionWindowedDeserializer) {
-
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
+ consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG,
windowSize);
+ if (keyDeserializer instanceof TimeWindowedDeserializer) {
+
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
+ Serdes.serdeFrom(innerClass).getClass().getName());
+ } else if (keyDeserializer instanceof SessionWindowedDeserializer) {
+
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
}
return
IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 6aedae59576..b98806590a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -45,6 +45,10 @@ import
org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.UpgradeFromValues;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.SessionWindowedSerializer;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.assignment.TaskAssignor;
@@ -829,13 +833,28 @@ public class StreamsConfig extends AbstractConfig {
+ CONFIG_ERROR_MSG
+ "\"NO_OPTIMIZATION\" by default.";
- /** {@code windowed.inner.class.serde} */
+ /**
+ * {@code windowed.inner.class.serde}
+ *
+ * @deprecated since 4.1.0.
+ * Use {@link TimeWindowedSerializer#WINDOWED_INNER_SERIALIZER_CLASS} for
{@link TimeWindowedSerializer}.
+ * Use {@link TimeWindowedDeserializer#WINDOWED_INNER_DESERIALIZER_CLASS}
for {@link TimeWindowedDeserializer}.
+ * Use {@link SessionWindowedSerializer#WINDOWED_INNER_SERIALIZER_CLASS}
for {@link SessionWindowedSerializer}.
+ * Use {@link
SessionWindowedDeserializer#WINDOWED_INNER_DESERIALIZER_CLASS} for {@link
SessionWindowedDeserializer}.
+ */
+ @Deprecated
public static final String WINDOWED_INNER_CLASS_SERDE =
"windowed.inner.class.serde";
private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default
serializer / deserializer for the inner class of a windowed record. Must
implement the " +
"<code>org.apache.kafka.common.serialization.Serde</code> interface.
Note that setting this config in KafkaStreams application would result " +
"in an error as it is meant to be used only from Plain consumer
client.";
- /** {@code window.size.ms} */
+ /**
+ * {@code window.size.ms}
+ *
+ * @deprecated since 4.1.0.
+ * Use {@link TimeWindowedDeserializer#WINDOW_SIZE_MS_CONFIG} for {@link
TimeWindowedDeserializer}.
+ */
+ @Deprecated
public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
private static final String WINDOW_SIZE_MS_DOC = "Sets window size for the
deserializer in order to calculate window end times.";
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
index e77efe799cd..11795459c9c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
@@ -23,10 +23,20 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.internals.SessionKeySchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Map;
public class SessionWindowedDeserializer<T> implements
Deserializer<Windowed<T>> {
+ /**
+ * Default deserializer for the inner deserializer class of a windowed
record. Must implement the {@link Serde} interface.
+ */
+ public static final String WINDOWED_INNER_DESERIALIZER_CLASS =
"windowed.inner.deserializer.class";
+
+ private final Logger log =
LoggerFactory.getLogger(SessionWindowedDeserializer.class);
+
private Deserializer<T> inner;
// Default constructor needed by Kafka
@@ -36,34 +46,43 @@ public class SessionWindowedDeserializer<T> implements
Deserializer<Windowed<T>>
this.inner = inner;
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"deprecation", "unchecked"})
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
- final String windowedInnerClassSerdeConfig = (String)
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
-
- Serde<T> windowInnerClassSerde = null;
+ String deserializerConfigKey = WINDOWED_INNER_DESERIALIZER_CLASS;
+ String deserializerConfigValue = (String)
configs.get(WINDOWED_INNER_DESERIALIZER_CLASS);
+ if (deserializerConfigValue == null) {
+ final String windowedInnerClassSerdeConfig = (String)
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
+ if (windowedInnerClassSerdeConfig != null) {
+ deserializerConfigKey =
StreamsConfig.WINDOWED_INNER_CLASS_SERDE;
+ deserializerConfigValue = windowedInnerClassSerdeConfig;
+ log.warn("Config {} is deprecated. Please use {} instead.",
+ StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
WINDOWED_INNER_DESERIALIZER_CLASS);
+ }
+ }
- if (windowedInnerClassSerdeConfig != null) {
+ Serde<T> windowedInnerDeserializerClass = null;
+ if (deserializerConfigValue != null) {
try {
- windowInnerClassSerde =
Utils.newInstance(windowedInnerClassSerdeConfig, Serde.class);
+ windowedInnerDeserializerClass =
Utils.newInstance(deserializerConfigValue, Serde.class);
} catch (final ClassNotFoundException e) {
- throw new
ConfigException(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
windowedInnerClassSerdeConfig,
- "Serde class " + windowedInnerClassSerdeConfig + " could
not be found.");
+ throw new ConfigException(deserializerConfigKey,
deserializerConfigValue,
+ "Serde class " + deserializerConfigValue + " could not be
found.");
}
}
- if (inner != null && windowedInnerClassSerdeConfig != null) {
- if
(!inner.getClass().getName().equals(windowInnerClassSerde.deserializer().getClass().getName()))
{
+ if (inner != null && deserializerConfigValue != null) {
+ if
(!inner.getClass().getName().equals(windowedInnerDeserializerClass.deserializer().getClass().getName()))
{
throw new IllegalArgumentException("Inner class deserializer
set using constructor "
+ "(" + inner.getClass().getName() + ")" +
- " is different from the one set in
windowed.inner.class.serde config " +
- "(" +
windowInnerClassSerde.deserializer().getClass().getName() + ").");
+ " is different from the one set in " +
deserializerConfigKey + " config " +
+ "(" +
windowedInnerDeserializerClass.deserializer().getClass().getName() + ").");
}
- } else if (inner == null && windowedInnerClassSerdeConfig == null) {
+ } else if (inner == null && deserializerConfigValue == null) {
throw new IllegalArgumentException("Inner class deserializer
should be set either via constructor " +
- "or via the windowed.inner.class.serde config");
+ "or via the " + WINDOWED_INNER_DESERIALIZER_CLASS + " config");
} else if (inner == null)
- inner = windowInnerClassSerde.deserializer();
+ inner = windowedInnerDeserializerClass.deserializer();
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
index 6ec10bf8668..60e2b81d497 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
@@ -24,10 +24,20 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.state.internals.SessionKeySchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Map;
public class SessionWindowedSerializer<T> implements WindowedSerializer<T> {
+ /**
+ * Default serializer for the inner serializer class of a windowed record.
Must implement the {@link Serde} interface.
+ */
+ public static final String WINDOWED_INNER_SERIALIZER_CLASS =
"windowed.inner.serializer.class";
+
+ private final Logger log =
LoggerFactory.getLogger(SessionWindowedSerializer.class);
+
private Serializer<T> inner;
// Default constructor needed by Kafka
@@ -37,32 +47,42 @@ public class SessionWindowedSerializer<T> implements
WindowedSerializer<T> {
this.inner = inner;
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"deprecation", "unchecked"})
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
- final String windowedInnerClassSerdeConfig = (String)
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
- Serde<T> windowInnerClassSerde = null;
- if (windowedInnerClassSerdeConfig != null) {
+ String serializerConfigKey = WINDOWED_INNER_SERIALIZER_CLASS;
+ String serializerConfigValue = (String)
configs.get(WINDOWED_INNER_SERIALIZER_CLASS);
+ if (serializerConfigValue == null) {
+ final String windowedInnerClassSerdeConfig = (String)
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
+ if (windowedInnerClassSerdeConfig != null) {
+ serializerConfigKey = StreamsConfig.WINDOWED_INNER_CLASS_SERDE;
+ serializerConfigValue = windowedInnerClassSerdeConfig;
+ log.warn("Config {} is deprecated. Please use {} instead.",
+ StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
WINDOWED_INNER_SERIALIZER_CLASS);
+ }
+ }
+ Serde<T> windowedInnerSerializerClass = null;
+ if (serializerConfigValue != null) {
try {
- windowInnerClassSerde =
Utils.newInstance(windowedInnerClassSerdeConfig, Serde.class);
+ windowedInnerSerializerClass =
Utils.newInstance(serializerConfigValue, Serde.class);
} catch (final ClassNotFoundException e) {
- throw new
ConfigException(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
windowedInnerClassSerdeConfig,
- "Serde class " + windowedInnerClassSerdeConfig + " could
not be found.");
+ throw new ConfigException(serializerConfigKey,
serializerConfigValue,
+ "Serde class " + serializerConfigValue + " could not be
found.");
}
}
- if (inner != null && windowedInnerClassSerdeConfig != null) {
- if
(!inner.getClass().getName().equals(windowInnerClassSerde.serializer().getClass().getName()))
{
+ if (inner != null && serializerConfigValue != null) {
+ if
(!inner.getClass().getName().equals(windowedInnerSerializerClass.serializer().getClass().getName()))
{
throw new IllegalArgumentException("Inner class serializer set
using constructor "
+ "(" + inner.getClass().getName() + ")" +
- " is different from the one set in
windowed.inner.class.serde config " +
- "(" +
windowInnerClassSerde.serializer().getClass().getName() + ").");
+ " is different from the one set in " + serializerConfigKey
+ " config " +
+ "(" +
windowedInnerSerializerClass.serializer().getClass().getName() + ").");
}
- } else if (inner == null && windowedInnerClassSerdeConfig == null) {
+ } else if (inner == null && serializerConfigValue == null) {
throw new IllegalArgumentException("Inner class serializer should
be set either via constructor " +
- "or via the windowed.inner.class.serde config");
+ "or via the " + WINDOWED_INNER_SERIALIZER_CLASS + " config");
} else if (inner == null)
- inner = windowInnerClassSerde.serializer();
+ inner = windowedInnerSerializerClass.serializer();
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
index 77825f2e1fa..26fcbac785f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
@@ -23,10 +23,25 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Map;
public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>> {
+ /**
+ * Sets window size for the deserializer in order to calculate window end
times.
+ */
+ public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
+
+ /**
+ * Default deserializer for the inner deserializer class of a windowed
record. Must implement the {@link Serde} interface.
+ */
+ public static final String WINDOWED_INNER_DESERIALIZER_CLASS =
"windowed.inner.deserializer.class";
+
+ private final Logger log =
LoggerFactory.getLogger(TimeWindowedDeserializer.class);
+
private Long windowSize;
private boolean isChangelogTopic;
@@ -47,50 +62,10 @@ public class TimeWindowedDeserializer<T> implements
Deserializer<Windowed<T>> {
return this.windowSize;
}
- @SuppressWarnings("unchecked")
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
- //check to see if the window size config is set and the window size is
already set from the constructor
- final Long configWindowSize;
- if (configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG) instanceof
String) {
- configWindowSize = Long.parseLong((String)
configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG));
- } else {
- configWindowSize = (Long)
configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG);
- }
- if (windowSize != null && configWindowSize != null) {
- throw new IllegalArgumentException("Window size should not be set
in both the time windowed deserializer constructor and the window.size.ms
config");
- } else if (windowSize == null && configWindowSize == null) {
- throw new IllegalArgumentException("Window size needs to be set
either through the time windowed deserializer " +
- "constructor or the window.size.ms config but not both");
- } else {
- windowSize = windowSize == null ? configWindowSize : windowSize;
- }
-
- final String windowedInnerClassSerdeConfig = (String)
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
-
- Serde<T> windowInnerClassSerde = null;
-
- if (windowedInnerClassSerdeConfig != null) {
- try {
- windowInnerClassSerde =
Utils.newInstance(windowedInnerClassSerdeConfig, Serde.class);
- } catch (final ClassNotFoundException e) {
- throw new
ConfigException(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
windowedInnerClassSerdeConfig,
- "Serde class " + windowedInnerClassSerdeConfig + " could
not be found.");
- }
- }
-
- if (inner != null && windowedInnerClassSerdeConfig != null) {
- if
(!inner.getClass().getName().equals(windowInnerClassSerde.deserializer().getClass().getName()))
{
- throw new IllegalArgumentException("Inner class deserializer
set using constructor "
- + "(" + inner.getClass().getName() + ")" +
- " is different from the one set in
windowed.inner.class.serde config " +
- "(" +
windowInnerClassSerde.deserializer().getClass().getName() + ").");
- }
- } else if (inner == null && windowedInnerClassSerdeConfig == null) {
- throw new IllegalArgumentException("Inner class deserializer
should be set either via constructor " +
- "or via the windowed.inner.class.serde config");
- } else if (inner == null)
- inner = windowInnerClassSerde.deserializer();
+ configureWindowSizeMs(configs);
+ configureWindowInnerDeserializerClass(configs);
}
@Override
@@ -125,4 +100,60 @@ public class TimeWindowedDeserializer<T> implements
Deserializer<Windowed<T>> {
Deserializer<T> innerDeserializer() {
return inner;
}
+
+ private void configureWindowSizeMs(final Map<String, ?> configs) {
+ //check to see if the window size config is set and the window size is
already set from the constructor
+ final Long configWindowSize;
+ if (configs.get(WINDOW_SIZE_MS_CONFIG) instanceof String) {
+ configWindowSize = Long.parseLong((String)
configs.get(WINDOW_SIZE_MS_CONFIG));
+ } else {
+ configWindowSize = (Long) configs.get(WINDOW_SIZE_MS_CONFIG);
+ }
+ if (windowSize != null && configWindowSize != null) {
+ throw new IllegalArgumentException("Window size should not be set
in both the time windowed deserializer constructor and the window.size.ms
config");
+ } else if (windowSize == null && configWindowSize == null) {
+ throw new IllegalArgumentException("Window size needs to be set
either through the time windowed deserializer " +
+ "constructor or the window.size.ms config but not both");
+ } else {
+ windowSize = windowSize == null ? configWindowSize : windowSize;
+ }
+ }
+
+ @SuppressWarnings({"deprecation", "unchecked"})
+ private void configureWindowInnerDeserializerClass(final Map<String, ?>
configs) {
+ String deserializerConfigKey = WINDOWED_INNER_DESERIALIZER_CLASS;
+ String deserializerConfigValue = (String)
configs.get(WINDOWED_INNER_DESERIALIZER_CLASS);
+ if (deserializerConfigValue == null) {
+ final String windowedInnerClassSerdeConfig = (String)
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
+ if (windowedInnerClassSerdeConfig != null) {
+ deserializerConfigKey =
StreamsConfig.WINDOWED_INNER_CLASS_SERDE;
+ deserializerConfigValue = windowedInnerClassSerdeConfig;
+ log.warn("Config {} is deprecated. Please use {} instead.",
+ StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
WINDOWED_INNER_DESERIALIZER_CLASS);
+ }
+ }
+
+ Serde<T> windowedInnerDeserializerClass = null;
+ if (deserializerConfigValue != null) {
+ try {
+ windowedInnerDeserializerClass =
Utils.newInstance(deserializerConfigValue, Serde.class);
+ } catch (final ClassNotFoundException e) {
+ throw new ConfigException(deserializerConfigKey,
deserializerConfigValue,
+ "Serde class " + deserializerConfigValue + " could not be
found.");
+ }
+ }
+
+ if (inner != null && deserializerConfigValue != null) {
+ if
(!inner.getClass().getName().equals(windowedInnerDeserializerClass.deserializer().getClass().getName()))
{
+ throw new IllegalArgumentException("Inner class deserializer
set using constructor "
+ + "(" + inner.getClass().getName() + ")" +
+ " is different from the one set in " +
deserializerConfigKey + " config " +
+ "(" +
windowedInnerDeserializerClass.deserializer().getClass().getName() + ").");
+ }
+ } else if (inner == null && deserializerConfigValue == null) {
+ throw new IllegalArgumentException("Inner class deserializer
should be set either via constructor " +
+ "or via the " + WINDOWED_INNER_DESERIALIZER_CLASS + " config");
+ } else if (inner == null)
+ inner = windowedInnerDeserializerClass.deserializer();
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
index 54bdd6a3b71..7cd13afc1d3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
@@ -24,10 +24,20 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Map;
public class TimeWindowedSerializer<T> implements WindowedSerializer<T> {
+ /**
+ * Default serializer for the inner serializer class of a windowed record.
Must implement the {@link Serde} interface.
+ */
+ public static final String WINDOWED_INNER_SERIALIZER_CLASS =
"windowed.inner.serializer.class";
+
+ private final Logger log =
LoggerFactory.getLogger(TimeWindowedSerializer.class);
+
private Serializer<T> inner;
// Default constructor needed by Kafka
@@ -38,32 +48,42 @@ public class TimeWindowedSerializer<T> implements
WindowedSerializer<T> {
this.inner = inner;
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"deprecation", "unchecked"})
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
- final String windowedInnerClassSerdeConfig = (String)
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
- Serde<T> windowInnerClassSerde = null;
- if (windowedInnerClassSerdeConfig != null) {
+ String serializerConfigKey = WINDOWED_INNER_SERIALIZER_CLASS;
+ String serializerConfigValue = (String)
configs.get(WINDOWED_INNER_SERIALIZER_CLASS);
+ if (serializerConfigValue == null) {
+ final String windowedInnerClassSerdeConfig = (String)
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
+ if (windowedInnerClassSerdeConfig != null) {
+ serializerConfigKey = StreamsConfig.WINDOWED_INNER_CLASS_SERDE;
+ serializerConfigValue = windowedInnerClassSerdeConfig;
+ log.warn("Config {} is deprecated. Please use {} instead.",
+ StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
WINDOWED_INNER_SERIALIZER_CLASS);
+ }
+ }
+ Serde<T> windowedInnerSerializerClass = null;
+ if (serializerConfigValue != null) {
try {
- windowInnerClassSerde =
Utils.newInstance(windowedInnerClassSerdeConfig, Serde.class);
+ windowedInnerSerializerClass =
Utils.newInstance(serializerConfigValue, Serde.class);
} catch (final ClassNotFoundException e) {
- throw new
ConfigException(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
windowedInnerClassSerdeConfig,
- "Serde class " + windowedInnerClassSerdeConfig + " could
not be found.");
+ throw new ConfigException(serializerConfigKey,
serializerConfigValue,
+ "Serde class " + serializerConfigValue + " could not be
found.");
}
}
- if (inner != null && windowedInnerClassSerdeConfig != null) {
- if
(!inner.getClass().getName().equals(windowInnerClassSerde.serializer().getClass().getName()))
{
+ if (inner != null && serializerConfigValue != null) {
+ if
(!inner.getClass().getName().equals(windowedInnerSerializerClass.serializer().getClass().getName()))
{
throw new IllegalArgumentException("Inner class serializer set
using constructor "
+ "(" + inner.getClass().getName() + ")" +
- " is different from the one set in
windowed.inner.class.serde config " +
- "(" +
windowInnerClassSerde.serializer().getClass().getName() + ").");
+ " is different from the one set in " + serializerConfigKey
+ " config " +
+ "(" +
windowedInnerSerializerClass.serializer().getClass().getName() + ").");
}
- } else if (inner == null && windowedInnerClassSerdeConfig == null) {
+ } else if (inner == null && serializerConfigValue == null) {
throw new IllegalArgumentException("Inner class serializer should
be set either via constructor " +
- "or via the windowed.inner.class.serde config");
+ "or via the " + WINDOWED_INNER_SERIALIZER_CLASS + " config");
} else if (inner == null)
- inner = windowInnerClassSerde.serializer();
+ inner = windowedInnerSerializerClass.serializer();
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
index 82f0e355307..2bace901779 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
@@ -45,7 +45,7 @@ public class SessionWindowedDeserializerTest {
}
@Test
- public void shouldSetWindowedInnerClassDeserialiserThroughConfig() {
+ public void shouldSetSerializerThroughWindowedInnerClassSerdeConfig() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
Serdes.ByteArraySerde.class.getName());
final SessionWindowedDeserializer<?> deserializer = new
SessionWindowedDeserializer<>();
deserializer.configure(props, false);
@@ -53,20 +53,49 @@ public class SessionWindowedDeserializerTest {
}
@Test
- public void shouldThrowErrorIfWindowInnerClassDeserialiserIsNotSet() {
+ public void
shouldSetSerializerThroughWindowedInnerDeserializerClassConfig() {
+
props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.ByteArraySerde.class.getName());
+ final SessionWindowedDeserializer<?> deserializer = new
SessionWindowedDeserializer<>();
+ deserializer.configure(props, false);
+ assertInstanceOf(ByteArrayDeserializer.class,
deserializer.innerDeserializer());
+ }
+
+ @Test
+ public void
shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerDeserializerClassConfigIsSet()
{
+
props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.ByteArraySerde.class.getName());
+ props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
"some.non.existent.class");
+ final SessionWindowedDeserializer<?> deserializer = new
SessionWindowedDeserializer<>();
+ deserializer.configure(props, false);
+ assertInstanceOf(ByteArrayDeserializer.class,
deserializer.innerDeserializer());
+ }
+
+ @Test
+ public void
shouldThrowErrorIfWindowedInnerClassSerdeAndSessionWindowedDeserializerClassAreNotSet()
{
final SessionWindowedDeserializer<?> deserializer = new
SessionWindowedDeserializer<>();
assertThrows(IllegalArgumentException.class, () ->
deserializer.configure(props, false));
}
@Test
- public void
shouldThrowErrorIfDeserialisersConflictInConstructorAndConfig() {
+ public void
shouldThrowErrorIfDeserializersConflictInConstructorAndWindowedInnerClassSerdeConfig()
{
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
Serdes.ByteArraySerde.class.getName());
assertThrows(IllegalArgumentException.class, () ->
sessionWindowedDeserializer.configure(props, false));
}
@Test
- public void
shouldThrowConfigExceptionWhenInvalidWindowInnerClassDeserialiserSupplied() {
+ public void
shouldThrowErrorIfDeserializersConflictInConstructorAndWindowedInnerDeserializerClassConfig()
{
+
props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.ByteArraySerde.class.getName());
+ assertThrows(IllegalArgumentException.class, () ->
sessionWindowedDeserializer.configure(props, false));
+ }
+
+ @Test
+ public void
shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
"some.non.existent.class");
assertThrows(ConfigException.class, () ->
sessionWindowedDeserializer.configure(props, false));
}
+
+ @Test
+ public void
shouldThrowConfigExceptionWhenInvalidWindowedInnerDeserializerClassSupplied() {
+
props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
"some.non.existent.class");
+ assertThrows(ConfigException.class, () ->
sessionWindowedDeserializer.configure(props, false));
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
index 729c8408dfe..d7e30bc3fe4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
@@ -45,7 +45,7 @@ public class SessionWindowedSerializerTest {
}
@Test
- public void shouldSetWindowedInnerClassSerialiserThroughConfig() {
+ public void shouldSetSerializerThroughWindowedInnerClassSerdeConfig() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
Serdes.ByteArraySerde.class.getName());
final SessionWindowedSerializer<?> serializer = new
SessionWindowedSerializer<>();
serializer.configure(props, false);
@@ -53,20 +53,49 @@ public class SessionWindowedSerializerTest {
}
@Test
- public void shouldThrowErrorIfWindowInnerClassSerialiserIsNotSet() {
+ public void shouldSetSerializerThroughWindowedInnerSerializerClassConfig()
{
+ props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS,
Serdes.ByteArraySerde.class.getName());
+ final SessionWindowedSerializer<?> serializer = new
SessionWindowedSerializer<>();
+ serializer.configure(props, false);
+ assertInstanceOf(ByteArraySerializer.class,
serializer.innerSerializer());
+ }
+
+ @Test
+ public void
shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerSerializerClassConfigIsSet()
{
+ props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS,
Serdes.ByteArraySerde.class.getName());
+ props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
"some.non.existent.class");
+ final SessionWindowedSerializer<?> serializer = new
SessionWindowedSerializer<>();
+ serializer.configure(props, false);
+ assertInstanceOf(ByteArraySerializer.class,
serializer.innerSerializer());
+ }
+
+ @Test
+ public void
shouldThrowErrorIfWindowedInnerClassSerdeAndWindowedInnerSerializerClassAreNotSet()
{
final SessionWindowedSerializer<?> serializer = new
SessionWindowedSerializer<>();
assertThrows(IllegalArgumentException.class, () ->
serializer.configure(props, false));
}
@Test
- public void shouldThrowErrorIfSerialisersConflictInConstructorAndConfig() {
+ public void
shouldThrowErrorIfSerializersConflictInConstructorAndWindowedInnerClassSerdeConfig()
{
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
Serdes.ByteArraySerde.class.getName());
assertThrows(IllegalArgumentException.class, () ->
sessionWindowedSerializer.configure(props, false));
}
@Test
- public void
shouldThrowConfigExceptionWhenInvalidWindowInnerClassSerialiserSupplied() {
+ public void
shouldThrowErrorIfSerializersConflictInConstructorAndWindowedInnerSerializerClassConfig()
{
+ props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS,
Serdes.ByteArraySerde.class.getName());
+ assertThrows(IllegalArgumentException.class, () ->
sessionWindowedSerializer.configure(props, false));
+ }
+
+ @Test
+ public void
shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
"some.non.existent.class");
assertThrows(ConfigException.class, () ->
sessionWindowedSerializer.configure(props, false));
}
+
+ @Test
+ public void
shouldThrowConfigExceptionWhenInvalidWindowedInnerSerializerClassSupplied() {
+ props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS,
"some.non.existent.class");
+ assertThrows(ConfigException.class, () ->
sessionWindowedSerializer.configure(props, false));
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
index b82b48d7730..bfb8c80cf09 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
@@ -30,6 +30,7 @@ import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -49,7 +50,7 @@ public class TimeWindowedDeserializerTest {
}
@Test
- public void
shouldSetWindowSizeAndWindowedInnerDeserialiserThroughConfigs() {
+ public void
shouldSetWindowSizeAndDeserializerThroughWindowSizeMsAndWindowedInnerClassSerdeConfigs()
{
props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500");
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
Serdes.ByteArraySerde.class.getName());
final TimeWindowedDeserializer<?> deserializer = new
TimeWindowedDeserializer<>();
@@ -59,34 +60,92 @@ public class TimeWindowedDeserializerTest {
}
@Test
- public void shouldThrowErrorIfWindowSizeSetInConfigsAndConstructor() {
+ public void
shouldSetWindowSizeAndDeserializerThroughWindowSizeMsAndWindowedInnerDeserializerClassConfigs()
{
+ props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500");
+ props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.ByteArraySerde.class.getName());
+ final TimeWindowedDeserializer<?> deserializer = new
TimeWindowedDeserializer<>();
+ deserializer.configure(props, false);
+ assertThat(deserializer.getWindowSize(), is(500L));
+ assertInstanceOf(ByteArrayDeserializer.class,
deserializer.innerDeserializer());
+ }
+
+ @Test
+ public void shouldHaveSameConfigNameForWindowSizeMs() {
+ assertEquals(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG,
StreamsConfig.WINDOW_SIZE_MS_CONFIG);
+ }
+
+ @Test
+ public void
shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerDeserializerClassConfigIsSet()
{
+ props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500");
+ props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.ByteArraySerde.class.getName());
+ props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
"some.non.existent.class");
+ final TimeWindowedDeserializer<?> deserializer = new
TimeWindowedDeserializer<>();
+ deserializer.configure(props, false);
+ assertThat(deserializer.getWindowSize(), is(500L));
+ assertInstanceOf(ByteArrayDeserializer.class,
deserializer.innerDeserializer());
+ }
+
+ @Test
+ public void shouldThrowErrorIfWindowSizeSetInStreamsConfigAndConstructor()
{
props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500");
assertThrows(IllegalArgumentException.class, () ->
timeWindowedDeserializer.configure(props, false));
}
@Test
- public void shouldThrowErrorIfWindowSizeIsNotSet() {
+ public void
shouldThrowErrorIfWindowSizeSetInConstructorConfigAndConstructor() {
+ props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500");
+ assertThrows(IllegalArgumentException.class, () ->
timeWindowedDeserializer.configure(props, false));
+ }
+
+ @Test
+ public void
shouldThrowErrorIfWindowSizeIsNotSetAndWindowedInnerClassSerdeIsSet() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
Serdes.ByteArraySerde.class.getName());
final TimeWindowedDeserializer<?> deserializer = new
TimeWindowedDeserializer<>();
assertThrows(IllegalArgumentException.class, () ->
deserializer.configure(props, false));
}
@Test
- public void shouldThrowErrorIfWindowedInnerClassDeserialiserIsNotSet() {
+ public void
shouldThrowErrorIfWindowSizeIsNotSetAndWindowedInnerDeserializerClassIsSet() {
+ props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG,
Serdes.ByteArraySerde.class.getName());
+ final TimeWindowedDeserializer<?> deserializer = new
TimeWindowedDeserializer<>();
+ assertThrows(IllegalArgumentException.class, () ->
deserializer.configure(props, false));
+ }
+
+ @Test
+ public void
shouldThrowErrorIfWindowedInnerClassSerdeIsNotSetAndWindowSizeMsInStreamsConfigIsSet()
{
props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500");
final TimeWindowedDeserializer<?> deserializer = new
TimeWindowedDeserializer<>();
assertThrows(IllegalArgumentException.class, () ->
deserializer.configure(props, false));
}
@Test
- public void
shouldThrowErrorIfWindowedInnerClassDeserialisersConflictInConstructorAndConfig()
{
+ public void
shouldThrowErrorIfWindowedInnerClassSerdeIsNotSetAndWindowSizeMsInConstructorConfigIsSet()
{
+ props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500");
+ final TimeWindowedDeserializer<?> deserializer = new
TimeWindowedDeserializer<>();
+ assertThrows(IllegalArgumentException.class, () ->
deserializer.configure(props, false));
+ }
+
+ @Test
+ public void
shouldThrowErrorIfDeserializerConflictInConstructorAndWindowedInnerClassSerdeConfig()
{
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
Serdes.ByteArraySerde.class.getName());
assertThrows(IllegalArgumentException.class, () ->
timeWindowedDeserializer.configure(props, false));
}
@Test
- public void
shouldThrowConfigExceptionWhenInvalidWindowedInnerClassDeserialiserSupplied() {
+ public void
shouldThrowErrorIfDeserializerConflictInConstructorAndWindowedInnerDeserializerClassConfig()
{
+ props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.ByteArraySerde.class.getName());
+ assertThrows(IllegalArgumentException.class, () ->
timeWindowedDeserializer.configure(props, false));
+ }
+
+ @Test
+ public void
shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
"some.non.existent.class");
assertThrows(ConfigException.class, () ->
timeWindowedDeserializer.configure(props, false));
}
+
+ @Test
+ public void
shouldThrowConfigExceptionWhenInvalidWindowedInnerDeserializerClassSupplied() {
+ props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
"some.non.existent.class");
+ assertThrows(ConfigException.class, () ->
timeWindowedDeserializer.configure(props, false));
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
index ba5b9c339a4..7a13117db4a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
@@ -45,7 +45,7 @@ public class TimeWindowedSerializerTest {
}
@Test
- public void shouldSetWindowedInnerClassSerialiserThroughConfig() {
+ public void shouldSetSerializerThroughWindowedInnerClassSerdeConfig() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
Serdes.ByteArraySerde.class.getName());
final TimeWindowedSerializer<?> serializer = new
TimeWindowedSerializer<>();
serializer.configure(props, false);
@@ -53,21 +53,49 @@ public class TimeWindowedSerializerTest {
}
@Test
- public void shouldThrowErrorIfWindowedInnerClassSerialiserIsNotSet() {
+ public void shouldSetSerializerThroughWindowedInnerSerializerClassConfig()
{
+ props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS,
Serdes.ByteArraySerde.class.getName());
+ final TimeWindowedSerializer<?> serializer = new
TimeWindowedSerializer<>();
+ serializer.configure(props, false);
+ assertInstanceOf(ByteArraySerializer.class,
serializer.innerSerializer());
+ }
+
+ @Test
+ public void
shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerSerializerClassConfigIsSet()
{
+ props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS,
Serdes.ByteArraySerde.class.getName());
+ props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
"some.non.existent.class");
+ final TimeWindowedSerializer<?> serializer = new
TimeWindowedSerializer<>();
+ serializer.configure(props, false);
+ assertInstanceOf(ByteArraySerializer.class,
serializer.innerSerializer());
+ }
+
+ @Test
+ public void
shouldThrowErrorIfWindowedInnerClassSerdeAndWindowedInnerSerializerClassAreNotSet()
{
final TimeWindowedSerializer<?> serializer = new
TimeWindowedSerializer<>();
assertThrows(IllegalArgumentException.class, () ->
serializer.configure(props, false));
}
@Test
- public void
shouldThrowErrorIfWindowedInnerClassSerialisersConflictInConstructorAndConfig()
{
+ public void
shouldThrowErrorIfSerializerConflictInConstructorAndWindowedInnerClassSerdeConfig()
{
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
Serdes.ByteArraySerde.class.getName());
assertThrows(IllegalArgumentException.class, () ->
timeWindowedSerializer.configure(props, false));
}
@Test
- public void
shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerialiserSupplied() {
+ public void
shouldThrowErrorIfSerializerConflictInConstructorAndWindowedInnerSerializerClassConfig()
{
+ props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS,
Serdes.ByteArraySerde.class.getName());
+ assertThrows(IllegalArgumentException.class, () ->
timeWindowedSerializer.configure(props, false));
+ }
+
+ @Test
+ public void
shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
"some.non.existent.class");
assertThrows(ConfigException.class, () ->
timeWindowedSerializer.configure(props, false));
}
+ @Test
+ public void
shouldThrowConfigExceptionWhenInvalidWindowedInnerSerializerClassSupplied() {
+ props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS,
"some.non.existent.class");
+ assertThrows(ConfigException.class, () ->
timeWindowedSerializer.configure(props, false));
+ }
}