This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 44d2741b4ce MINOR: fix bug in TimeWindowedDeserializerTest (#19570) 44d2741b4ce is described below commit 44d2741b4cee581f3dcc1506c5ed8dc8f56399d0 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Wed Apr 30 17:58:22 2025 -0700 MINOR: fix bug in TimeWindowedDeserializerTest (#19570) Test throws `NumberFormatException` and thus still passed as this exception extends `IllegalArgumentException`. However, the test does not verify what it is supposed to verify. Piggybacking some code cleanup to all related files. Reviewers: PoAn Yang <pay...@apache.org>, Ken Huang <s7133...@gmail.com>, Bill Bejeck <b...@confluent.io> --- .../kstream/SessionWindowedDeserializerTest.java | 30 +++++++----- .../kstream/SessionWindowedSerializerTest.java | 30 +++++++----- .../kstream/TimeWindowedDeserializerTest.java | 57 ++++++++++++++-------- .../kstream/TimeWindowedSerializerTest.java | 30 +++++++----- 4 files changed, 93 insertions(+), 54 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java index 2bace901779..0315120ae1b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java @@ -44,37 +44,44 @@ public class SessionWindowedDeserializerTest { assertInstanceOf(StringDeserializer.class, inner, "Inner deserializer type should be StringDeserializer"); } + @Deprecated @Test public void shouldSetSerializerThroughWindowedInnerClassSerdeConfig() { props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName()); - final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>(); - deserializer.configure(props, false); - assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer()); + try (final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>()) { + deserializer.configure(props, false); + assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer()); + } } @Test public void shouldSetSerializerThroughWindowedInnerDeserializerClassConfig() { props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName()); - final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>(); - deserializer.configure(props, false); - assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer()); + try (final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>()) { + deserializer.configure(props, false); + assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer()); + } } + @Deprecated @Test public void shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerDeserializerClassConfigIsSet() { props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName()); props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class"); - final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>(); - deserializer.configure(props, false); - assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer()); + try (final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>()) { + deserializer.configure(props, false); + assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer()); + } } @Test public void shouldThrowErrorIfWindowedInnerClassSerdeAndSessionWindowedDeserializerClassAreNotSet() { - final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>(); - assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false)); + try (final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>()) { + assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false)); + } } + @Deprecated @Test public void shouldThrowErrorIfDeserializersConflictInConstructorAndWindowedInnerClassSerdeConfig() { props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName()); @@ -87,6 +94,7 @@ public class SessionWindowedDeserializerTest { assertThrows(IllegalArgumentException.class, () -> sessionWindowedDeserializer.configure(props, false)); } + @Deprecated @Test public void shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() { props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class"); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java index d7e30bc3fe4..212b0c810e5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java @@ -44,37 +44,44 @@ public class SessionWindowedSerializerTest { assertInstanceOf(StringSerializer.class, inner, "Inner serializer type should be StringSerializer"); } + @Deprecated @Test public void shouldSetSerializerThroughWindowedInnerClassSerdeConfig() { props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName()); - final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>(); - serializer.configure(props, false); - assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer()); + try (final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>()) { + serializer.configure(props, false); + assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer()); + } } @Test public void shouldSetSerializerThroughWindowedInnerSerializerClassConfig() { props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName()); - final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>(); - serializer.configure(props, false); - assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer()); + try (final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>()) { + serializer.configure(props, false); + assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer()); + } } + @Deprecated @Test public void shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerSerializerClassConfigIsSet() { props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName()); props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class"); - final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>(); - serializer.configure(props, false); - assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer()); + try (final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>()) { + serializer.configure(props, false); + assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer()); + } } @Test public void shouldThrowErrorIfWindowedInnerClassSerdeAndWindowedInnerSerializerClassAreNotSet() { - final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>(); - assertThrows(IllegalArgumentException.class, () -> serializer.configure(props, false)); + try (final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>()) { + assertThrows(IllegalArgumentException.class, () -> serializer.configure(props, false)); + } } + @Deprecated @Test public void shouldThrowErrorIfSerializersConflictInConstructorAndWindowedInnerClassSerdeConfig() { props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName()); @@ -87,6 +94,7 @@ public class SessionWindowedSerializerTest { assertThrows(IllegalArgumentException.class, () -> sessionWindowedSerializer.configure(props, false)); } + @Deprecated @Test public void shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() { props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class"); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java index bfb8c80cf09..d2485c7785d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java @@ -49,42 +49,49 @@ public class TimeWindowedDeserializerTest { assertThat(timeWindowedDeserializer.getWindowSize(), is(5000000L)); } + @Deprecated @Test public void shouldSetWindowSizeAndDeserializerThroughWindowSizeMsAndWindowedInnerClassSerdeConfigs() { props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500"); props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName()); - final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>(); - deserializer.configure(props, false); - assertThat(deserializer.getWindowSize(), is(500L)); - assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer()); + try (final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>()) { + deserializer.configure(props, false); + assertThat(deserializer.getWindowSize(), is(500L)); + assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer()); + } } @Test public void shouldSetWindowSizeAndDeserializerThroughWindowSizeMsAndWindowedInnerDeserializerClassConfigs() { props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500"); props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName()); - final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>(); - deserializer.configure(props, false); - assertThat(deserializer.getWindowSize(), is(500L)); - assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer()); + try (final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>()) { + deserializer.configure(props, false); + assertThat(deserializer.getWindowSize(), is(500L)); + assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer()); + } } + @Deprecated @Test public void shouldHaveSameConfigNameForWindowSizeMs() { assertEquals(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, StreamsConfig.WINDOW_SIZE_MS_CONFIG); } + @Deprecated @Test public void shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerDeserializerClassConfigIsSet() { props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500"); props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName()); props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class"); - final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>(); - deserializer.configure(props, false); - assertThat(deserializer.getWindowSize(), is(500L)); - assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer()); + try (final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>()) { + deserializer.configure(props, false); + assertThat(deserializer.getWindowSize(), is(500L)); + assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer()); + } } + @Deprecated @Test public void shouldThrowErrorIfWindowSizeSetInStreamsConfigAndConstructor() { props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500"); @@ -97,34 +104,41 @@ public class TimeWindowedDeserializerTest { assertThrows(IllegalArgumentException.class, () -> timeWindowedDeserializer.configure(props, false)); } + @Deprecated @Test public void shouldThrowErrorIfWindowSizeIsNotSetAndWindowedInnerClassSerdeIsSet() { props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName()); - final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>(); - assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false)); + try (final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>()) { + assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false)); + } } @Test public void shouldThrowErrorIfWindowSizeIsNotSetAndWindowedInnerDeserializerClassIsSet() { - props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, Serdes.ByteArraySerde.class.getName()); - final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>(); - assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false)); + props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName()); + try (final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>()) { + assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false)); + } } + @Deprecated @Test public void shouldThrowErrorIfWindowedInnerClassSerdeIsNotSetAndWindowSizeMsInStreamsConfigIsSet() { props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500"); - final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>(); - assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false)); + try (final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>()) { + assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false)); + } } @Test public void shouldThrowErrorIfWindowedInnerClassSerdeIsNotSetAndWindowSizeMsInConstructorConfigIsSet() { props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500"); - final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>(); - assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false)); + try (final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>()) { + assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false)); + } } + @Deprecated @Test public void shouldThrowErrorIfDeserializerConflictInConstructorAndWindowedInnerClassSerdeConfig() { props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName()); @@ -137,6 +151,7 @@ public class TimeWindowedDeserializerTest { assertThrows(IllegalArgumentException.class, () -> timeWindowedDeserializer.configure(props, false)); } + @Deprecated @Test public void shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() { props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class"); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java index 7a13117db4a..5fd96f72c1b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java @@ -44,37 +44,44 @@ public class TimeWindowedSerializerTest { assertInstanceOf(StringSerializer.class, inner, "Inner serializer type should be StringSerializer"); } + @Deprecated @Test public void shouldSetSerializerThroughWindowedInnerClassSerdeConfig() { props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName()); - final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>(); - serializer.configure(props, false); - assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer()); + try (final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>()) { + serializer.configure(props, false); + assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer()); + } } @Test public void shouldSetSerializerThroughWindowedInnerSerializerClassConfig() { props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName()); - final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>(); - serializer.configure(props, false); - assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer()); + try (final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>()) { + serializer.configure(props, false); + assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer()); + } } + @Deprecated @Test public void shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerSerializerClassConfigIsSet() { props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName()); props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class"); - final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>(); - serializer.configure(props, false); - assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer()); + try (final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>()) { + serializer.configure(props, false); + assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer()); + } } @Test public void shouldThrowErrorIfWindowedInnerClassSerdeAndWindowedInnerSerializerClassAreNotSet() { - final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>(); - assertThrows(IllegalArgumentException.class, () -> serializer.configure(props, false)); + try (final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>()) { + assertThrows(IllegalArgumentException.class, () -> serializer.configure(props, false)); + } } + @Deprecated @Test public void shouldThrowErrorIfSerializerConflictInConstructorAndWindowedInnerClassSerdeConfig() { props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName()); @@ -87,6 +94,7 @@ public class TimeWindowedSerializerTest { assertThrows(IllegalArgumentException.class, () -> timeWindowedSerializer.configure(props, false)); } + @Deprecated @Test public void shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() { props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");