This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new e4b817fe6e8 KAFKA-15800: Prevent DataExceptions from corrupting 
KafkaOffsetBackingStore (#14718)
e4b817fe6e8 is described below

commit e4b817fe6e8413605f5a008d865a2ed314a70368
Author: Greg Harris <[email protected]>
AuthorDate: Fri Nov 10 08:40:16 2023 -0800

    KAFKA-15800: Prevent DataExceptions from corrupting KafkaOffsetBackingStore 
(#14718)
    
    Signed-off-by: Greg Harris <[email protected]>
    
    Reviewers: Yash Mayya <[email protected]>
---
 .../apache/kafka/connect/storage/OffsetUtils.java  |  8 ++-
 .../kafka/connect/storage/OffsetUtilsTest.java     | 59 ++++++++++++----------
 2 files changed, 38 insertions(+), 29 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
index 3a94ab2872d..1d6632e60cb 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
@@ -89,7 +89,13 @@ public class OffsetUtils {
         }
         // The topic parameter is irrelevant for the JsonConverter which is 
the internal converter used by
         // Connect workers.
-        Object deserializedKey = keyConverter.toConnectData("", 
partitionKey).value();
+        Object deserializedKey;
+        try {
+            deserializedKey = keyConverter.toConnectData("", 
partitionKey).value();
+        } catch (DataException e) {
+            log.warn("Ignoring offset partition key with unknown 
serialization. Expected json.", e);
+            return;
+        }
         if (!(deserializedKey instanceof List)) {
             log.warn("Ignoring offset partition key with an unexpected format. 
Expected type: {}, actual type: {}",
                     List.class.getName(), className(deserializedKey));
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java
index f5e6910bf54..06fb51d3ca6 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.junit.Test;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -75,49 +76,51 @@ public class OffsetUtilsTest {
         OffsetUtils.validateFormat(offsetData);
     }
 
+    @Test
+    public void testProcessPartitionKeyWithUnknownSerialization() {
+        assertInvalidPartitionKey(
+                new byte[]{0},
+                "Ignoring offset partition key with unknown serialization");
+        assertInvalidPartitionKey(
+                "i-am-not-json".getBytes(StandardCharsets.UTF_8),
+                "Ignoring offset partition key with unknown serialization");
+    }
+
     @Test
     public void testProcessPartitionKeyNotList() {
-        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
-            Map<String, Set<Map<String, Object>>> connectorPartitions = new 
HashMap<>();
-            OffsetUtils.processPartitionKey(serializePartitionKey(new 
HashMap<>()), new byte[0], CONVERTER, connectorPartitions);
-            // Expect no partition to be added to the map since the partition 
key is of an invalid format
-            assertEquals(0, connectorPartitions.size());
-            assertEquals(1, logCaptureAppender.getMessages().size());
-            assertThat(logCaptureAppender.getMessages().get(0),
-                    containsString("Ignoring offset partition key with an 
unexpected format"));
-        }
+        assertInvalidPartitionKey(
+                new byte[]{},
+                "Ignoring offset partition key with an unexpected format");
+        assertInvalidPartitionKey(
+                serializePartitionKey(new HashMap<>()),
+                "Ignoring offset partition key with an unexpected format");
     }
 
     @Test
     public void testProcessPartitionKeyListWithOneElement() {
-        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
-            Map<String, Set<Map<String, Object>>> connectorPartitions = new 
HashMap<>();
-            
OffsetUtils.processPartitionKey(serializePartitionKey(Collections.singletonList("")),
 new byte[0], CONVERTER, connectorPartitions);
-            // Expect no partition to be added to the map since the partition 
key is of an invalid format
-            assertEquals(0, connectorPartitions.size());
-            assertEquals(1, logCaptureAppender.getMessages().size());
-            assertThat(logCaptureAppender.getMessages().get(0),
-                    containsString("Ignoring offset partition key with an 
unexpected number of elements"));
-        }
+        assertInvalidPartitionKey(
+                serializePartitionKey(Collections.singletonList("")),
+                "Ignoring offset partition key with an unexpected number of 
elements");
     }
 
     @Test
     public void testProcessPartitionKeyListWithElementsOfWrongType() {
+        assertInvalidPartitionKey(
+                serializePartitionKey(Arrays.asList(1, new HashMap<>())),
+                "Ignoring offset partition key with an unexpected format for 
the first element in the partition key list");
+        assertInvalidPartitionKey(
+                serializePartitionKey(Arrays.asList("connector-name", new 
ArrayList<>())),
+                "Ignoring offset partition key with an unexpected format for 
the second element in the partition key list");
+    }
+
+    public void assertInvalidPartitionKey(byte[] key, String message) {
         try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
             Map<String, Set<Map<String, Object>>> connectorPartitions = new 
HashMap<>();
-            
OffsetUtils.processPartitionKey(serializePartitionKey(Arrays.asList(1, new 
HashMap<>())), new byte[0], CONVERTER, connectorPartitions);
+            OffsetUtils.processPartitionKey(key, new byte[0], CONVERTER, 
connectorPartitions);
             // Expect no partition to be added to the map since the partition 
key is of an invalid format
             assertEquals(0, connectorPartitions.size());
             assertEquals(1, logCaptureAppender.getMessages().size());
-            assertThat(logCaptureAppender.getMessages().get(0),
-                    containsString("Ignoring offset partition key with an 
unexpected format for the first element in the partition key list"));
-
-            
OffsetUtils.processPartitionKey(serializePartitionKey(Arrays.asList("connector-name",
 new ArrayList<>())), new byte[0], CONVERTER, connectorPartitions);
-            // Expect no partition to be added to the map since the partition 
key is of an invalid format
-            assertEquals(0, connectorPartitions.size());
-            assertEquals(2, logCaptureAppender.getMessages().size());
-            assertThat(logCaptureAppender.getMessages().get(1),
-                    containsString("Ignoring offset partition key with an 
unexpected format for the second element in the partition key list"));
+            assertThat(logCaptureAppender.getMessages().get(0), 
containsString(message));
         }
     }
 

Reply via email to