This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2e8bae8c7ec MINOR: Fix SubscriptionResponseWrapperSerializer (#17205)
2e8bae8c7ec is described below
commit 2e8bae8c7ec643234396b81f0862a6a0c9e0a526
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Sep 23 19:32:30 2024 -0700
MINOR: Fix SubscriptionResponseWrapperSerializer (#17205)
The existing check is not correct, because `byte` range is from -128...127.
This PR fixes the check to use `< 0`.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../streams/errors/ProductionExceptionHandler.java | 2 -
.../SubscriptionResponseWrapperSerde.java | 5 +-
.../SubscriptionResponseWrapperSerdeTest.java | 95 ++++++++++++++--------
3 files changed, 61 insertions(+), 41 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
index 939b1ecbcd6..02837b9dd80 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
@@ -46,7 +46,6 @@ public interface ProductionExceptionHandler extends
Configurable {
* @param record The record that failed to produce
* @param exception The exception that occurred during production
*/
- @SuppressWarnings("deprecation")
default ProductionExceptionHandlerResponse handle(final
ErrorHandlerContext context,
final
ProducerRecord<byte[], byte[]> record,
final Exception
exception) {
@@ -76,7 +75,6 @@ public interface ProductionExceptionHandler extends
Configurable {
* @param exception the exception that occurred during serialization
* @param origin the origin of the serialization exception
*/
- @SuppressWarnings("deprecation")
default ProductionExceptionHandlerResponse
handleSerializationException(final ErrorHandlerContext context,
final ProducerRecord record,
final Exception exception,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
index 9e143d4e296..72a918e455f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
@@ -66,9 +66,8 @@ public class SubscriptionResponseWrapperSerde<V> implements
Serde<SubscriptionRe
public byte[] serialize(final String topic, final
SubscriptionResponseWrapper<V> data) {
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized
data}
- //7-bit (0x7F) maximum for data version.
- if (Byte.compare((byte) 0x7F, data.version()) < 0) {
- throw new
UnsupportedVersionException("SubscriptionResponseWrapper version is larger than
maximum supported 0x7F");
+ if (data.version() < 0) {
+ throw new
UnsupportedVersionException("SubscriptionResponseWrapper version cannot be
negative");
}
final byte[] serializedData = data.foreignValue() == null ? null :
serializer.serialize(topic, data.foreignValue());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
index 9be5e39cc15..276600fd106 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
@@ -42,14 +42,10 @@ public class SubscriptionResponseWrapperSerdeTest {
}
@Override
- public void configure(final Map<String, ?> configs, final boolean
isKey) {
-
- }
+ public void configure(final Map<String, ?> configs, final boolean
isKey) { }
@Override
- public void close() {
-
- }
+ public void close() { }
@Override
public Serializer<T> serializer() {
@@ -73,68 +69,95 @@ public class SubscriptionResponseWrapperSerdeTest {
}
@Test
- @SuppressWarnings("unchecked")
public void ShouldSerdeWithNonNullsTest() {
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01,
(byte) 0x9A, (byte) 0xFF, (byte) 0x00});
final String foreignValue = "foreignValue";
final SubscriptionResponseWrapper<String> srw = new
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
- final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
- final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
- final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
+ try (final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+ final byte[] serResponse = srwSerde.serializer().serialize(null,
srw);
+ final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
- assertArrayEquals(hashedValue, result.originalValueHash());
- assertEquals(foreignValue, result.foreignValue());
- assertNull(result.primaryPartition());
+ assertArrayEquals(hashedValue, result.originalValueHash());
+ assertEquals(foreignValue, result.foreignValue());
+ assertNull(result.primaryPartition());
+ }
}
@Test
- @SuppressWarnings("unchecked")
public void shouldSerdeWithNullForeignValueTest() {
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01,
(byte) 0x9A, (byte) 0xFF, (byte) 0x00});
final SubscriptionResponseWrapper<String> srw = new
SubscriptionResponseWrapper<>(hashedValue, null, 1);
- final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
- final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
- final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
+ try (final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+ final byte[] serResponse = srwSerde.serializer().serialize(null,
srw);
+ final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
- assertArrayEquals(hashedValue, result.originalValueHash());
- assertNull(result.foreignValue());
- assertNull(result.primaryPartition());
+ assertArrayEquals(hashedValue, result.originalValueHash());
+ assertNull(result.foreignValue());
+ assertNull(result.primaryPartition());
+ }
}
@Test
- @SuppressWarnings("unchecked")
public void shouldSerdeWithNullHashTest() {
final long[] hashedValue = null;
final String foreignValue = "foreignValue";
final SubscriptionResponseWrapper<String> srw = new
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
- final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
- final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
- final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
+ try (final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+ final byte[] serResponse = srwSerde.serializer().serialize(null,
srw);
+ final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
- assertArrayEquals(hashedValue, result.originalValueHash());
- assertEquals(foreignValue, result.foreignValue());
- assertNull(result.primaryPartition());
+ assertArrayEquals(hashedValue, result.originalValueHash());
+ assertEquals(foreignValue, result.foreignValue());
+ assertNull(result.primaryPartition());
+ }
}
@Test
- @SuppressWarnings("unchecked")
public void shouldSerdeWithNullsTest() {
final long[] hashedValue = null;
final String foreignValue = null;
final SubscriptionResponseWrapper<String> srw = new
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
- final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
- final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
- final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
+ try (final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+ final byte[] serResponse = srwSerde.serializer().serialize(null,
srw);
+ final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
- assertArrayEquals(hashedValue, result.originalValueHash());
- assertEquals(foreignValue, result.foreignValue());
- assertNull(result.primaryPartition());
+ assertArrayEquals(hashedValue, result.originalValueHash());
+ assertEquals(foreignValue, result.foreignValue());
+ assertNull(result.primaryPartition());
+ }
}
@Test
public void shouldThrowExceptionWithBadVersionTest() {
final long[] hashedValue = null;
- assertThrows(UnsupportedVersionException.class,
- () -> new SubscriptionResponseWrapper<>(hashedValue,
"foreignValue", (byte) 0xFF, 1));
+ assertThrows(
+ UnsupportedVersionException.class,
+ () -> new SubscriptionResponseWrapper<>(hashedValue,
"foreignValue", (byte) -1, 1)
+ );
+ }
+
+ @Test
+ public void shouldThrowExceptionOnSerializeWhenDataVersionUnknown() {
+ final SubscriptionResponseWrapper<String> srw = new
InvalidSubscriptionResponseWrapper(null, null, 1);
+ try (final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde<>(null)) {
+ assertThrows(
+ UnsupportedVersionException.class,
+ () -> srwSerde.serializer().serialize(null, srw)
+ );
+ }
+ }
+
+ public static class InvalidSubscriptionResponseWrapper extends
SubscriptionResponseWrapper<String> {
+
+ public InvalidSubscriptionResponseWrapper(final long[]
originalValueHash,
+ final String foreignValue,
+ final Integer
primaryPartition) {
+ super(originalValueHash, foreignValue, primaryPartition);
+ }
+
+ @Override
+ public byte version() {
+ return -1;
+ }
}
}
\ No newline at end of file