This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2e8bae8c7ec MINOR: Fix SubscriptionResponseWrapperSerializer (#17205)
2e8bae8c7ec is described below

commit 2e8bae8c7ec643234396b81f0862a6a0c9e0a526
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Sep 23 19:32:30 2024 -0700

    MINOR: Fix SubscriptionResponseWrapperSerializer (#17205)
    
    The existing check is not correct, because `byte` range is from -128...127.
    This PR fixes the check to use `< 0`.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../streams/errors/ProductionExceptionHandler.java |  2 -
 .../SubscriptionResponseWrapperSerde.java          |  5 +-
 .../SubscriptionResponseWrapperSerdeTest.java      | 95 ++++++++++++++--------
 3 files changed, 61 insertions(+), 41 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
index 939b1ecbcd6..02837b9dd80 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
@@ -46,7 +46,6 @@ public interface ProductionExceptionHandler extends 
Configurable {
      * @param record The record that failed to produce
      * @param exception The exception that occurred during production
      */
-    @SuppressWarnings("deprecation")
     default ProductionExceptionHandlerResponse handle(final 
ErrorHandlerContext context,
                                                       final 
ProducerRecord<byte[], byte[]> record,
                                                       final Exception 
exception) {
@@ -76,7 +75,6 @@ public interface ProductionExceptionHandler extends 
Configurable {
      * @param exception the exception that occurred during serialization
      * @param origin    the origin of the serialization exception
      */
-    @SuppressWarnings("deprecation")
     default ProductionExceptionHandlerResponse 
handleSerializationException(final ErrorHandlerContext context,
                                                                             
final ProducerRecord record,
                                                                             
final Exception exception,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
index 9e143d4e296..72a918e455f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
@@ -66,9 +66,8 @@ public class SubscriptionResponseWrapperSerde<V> implements 
Serde<SubscriptionRe
         public byte[] serialize(final String topic, final 
SubscriptionResponseWrapper<V> data) {
             
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized 
data}
 
-            //7-bit (0x7F) maximum for data version.
-            if (Byte.compare((byte) 0x7F, data.version()) < 0) {
-                throw new 
UnsupportedVersionException("SubscriptionResponseWrapper version is larger than 
maximum supported 0x7F");
+            if (data.version() < 0) {
+                throw new 
UnsupportedVersionException("SubscriptionResponseWrapper version cannot be 
negative");
             }
 
             final byte[] serializedData = data.foreignValue() == null ? null : 
serializer.serialize(topic, data.foreignValue());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
index 9be5e39cc15..276600fd106 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
@@ -42,14 +42,10 @@ public class SubscriptionResponseWrapperSerdeTest {
         }
 
         @Override
-        public void configure(final Map<String, ?> configs, final boolean 
isKey) {
-
-        }
+        public void configure(final Map<String, ?> configs, final boolean 
isKey) { }
 
         @Override
-        public void close() {
-
-        }
+        public void close() { }
 
         @Override
         public Serializer<T> serializer() {
@@ -73,68 +69,95 @@ public class SubscriptionResponseWrapperSerdeTest {
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void ShouldSerdeWithNonNullsTest() {
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, 
(byte) 0x9A, (byte) 0xFF, (byte) 0x00});
         final String foreignValue = "foreignValue";
         final SubscriptionResponseWrapper<String> srw = new 
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
-        final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
-        final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
-        final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
+        try (final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+            final byte[] serResponse = srwSerde.serializer().serialize(null, 
srw);
+            final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
 
-        assertArrayEquals(hashedValue, result.originalValueHash());
-        assertEquals(foreignValue, result.foreignValue());
-        assertNull(result.primaryPartition());
+            assertArrayEquals(hashedValue, result.originalValueHash());
+            assertEquals(foreignValue, result.foreignValue());
+            assertNull(result.primaryPartition());
+        }
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void shouldSerdeWithNullForeignValueTest() {
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, 
(byte) 0x9A, (byte) 0xFF, (byte) 0x00});
         final SubscriptionResponseWrapper<String> srw = new 
SubscriptionResponseWrapper<>(hashedValue, null, 1);
-        final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
-        final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
-        final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
+        try (final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+            final byte[] serResponse = srwSerde.serializer().serialize(null, 
srw);
+            final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
 
-        assertArrayEquals(hashedValue, result.originalValueHash());
-        assertNull(result.foreignValue());
-        assertNull(result.primaryPartition());
+            assertArrayEquals(hashedValue, result.originalValueHash());
+            assertNull(result.foreignValue());
+            assertNull(result.primaryPartition());
+        }
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void shouldSerdeWithNullHashTest() {
         final long[] hashedValue = null;
         final String foreignValue = "foreignValue";
         final SubscriptionResponseWrapper<String> srw = new 
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
-        final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
-        final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
-        final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
+        try (final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+            final byte[] serResponse = srwSerde.serializer().serialize(null, 
srw);
+            final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
 
-        assertArrayEquals(hashedValue, result.originalValueHash());
-        assertEquals(foreignValue, result.foreignValue());
-        assertNull(result.primaryPartition());
+            assertArrayEquals(hashedValue, result.originalValueHash());
+            assertEquals(foreignValue, result.foreignValue());
+            assertNull(result.primaryPartition());
+        }
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void shouldSerdeWithNullsTest() {
         final long[] hashedValue = null;
         final String foreignValue = null;
         final SubscriptionResponseWrapper<String> srw = new 
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
-        final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
-        final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
-        final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
+        try (final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+            final byte[] serResponse = srwSerde.serializer().serialize(null, 
srw);
+            final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
 
-        assertArrayEquals(hashedValue, result.originalValueHash());
-        assertEquals(foreignValue, result.foreignValue());
-        assertNull(result.primaryPartition());
+            assertArrayEquals(hashedValue, result.originalValueHash());
+            assertEquals(foreignValue, result.foreignValue());
+            assertNull(result.primaryPartition());
+        }
     }
 
     @Test
     public void shouldThrowExceptionWithBadVersionTest() {
         final long[] hashedValue = null;
-        assertThrows(UnsupportedVersionException.class,
-            () -> new SubscriptionResponseWrapper<>(hashedValue, 
"foreignValue", (byte) 0xFF, 1));
+        assertThrows(
+            UnsupportedVersionException.class,
+            () -> new SubscriptionResponseWrapper<>(hashedValue, 
"foreignValue", (byte) -1, 1)
+        );
+    }
+
+    @Test
+    public void shouldThrowExceptionOnSerializeWhenDataVersionUnknown() {
+        final SubscriptionResponseWrapper<String> srw = new 
InvalidSubscriptionResponseWrapper(null, null, 1);
+        try (final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde<>(null)) {
+            assertThrows(
+                UnsupportedVersionException.class,
+                () -> srwSerde.serializer().serialize(null, srw)
+            );
+        }
+    }
+
+    public static class InvalidSubscriptionResponseWrapper extends 
SubscriptionResponseWrapper<String> {
+
+        public InvalidSubscriptionResponseWrapper(final long[] 
originalValueHash,
+                                                  final String foreignValue,
+                                                  final Integer 
primaryPartition) {
+            super(originalValueHash, foreignValue, primaryPartition);
+        }
+
+        @Override
+        public byte version() {
+            return -1;
+        }
     }
 }
\ No newline at end of file

Reply via email to