This is an automated email from the ASF dual-hosted git repository.
gharris pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new e4b817fe6e8 KAFKA-15800: Prevent DataExceptions from corrupting
KafkaOffsetBackingStore (#14718)
e4b817fe6e8 is described below
commit e4b817fe6e8413605f5a008d865a2ed314a70368
Author: Greg Harris <[email protected]>
AuthorDate: Fri Nov 10 08:40:16 2023 -0800
KAFKA-15800: Prevent DataExceptions from corrupting KafkaOffsetBackingStore
(#14718)
Signed-off-by: Greg Harris <[email protected]>
Reviewers: Yash Mayya <[email protected]>
---
.../apache/kafka/connect/storage/OffsetUtils.java | 8 ++-
.../kafka/connect/storage/OffsetUtilsTest.java | 59 ++++++++++++----------
2 files changed, 38 insertions(+), 29 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
index 3a94ab2872d..1d6632e60cb 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
@@ -89,7 +89,13 @@ public class OffsetUtils {
}
// The topic parameter is irrelevant for the JsonConverter which is
the internal converter used by
// Connect workers.
- Object deserializedKey = keyConverter.toConnectData("",
partitionKey).value();
+ Object deserializedKey;
+ try {
+ deserializedKey = keyConverter.toConnectData("",
partitionKey).value();
+ } catch (DataException e) {
+ log.warn("Ignoring offset partition key with unknown
serialization. Expected json.", e);
+ return;
+ }
if (!(deserializedKey instanceof List)) {
log.warn("Ignoring offset partition key with an unexpected format.
Expected type: {}, actual type: {}",
List.class.getName(), className(deserializedKey));
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java
index f5e6910bf54..06fb51d3ca6 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.junit.Test;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -75,49 +76,51 @@ public class OffsetUtilsTest {
OffsetUtils.validateFormat(offsetData);
}
+ @Test
+ public void testProcessPartitionKeyWithUnknownSerialization() {
+ assertInvalidPartitionKey(
+ new byte[]{0},
+ "Ignoring offset partition key with unknown serialization");
+ assertInvalidPartitionKey(
+ "i-am-not-json".getBytes(StandardCharsets.UTF_8),
+ "Ignoring offset partition key with unknown serialization");
+ }
+
@Test
public void testProcessPartitionKeyNotList() {
- try (LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
- Map<String, Set<Map<String, Object>>> connectorPartitions = new
HashMap<>();
- OffsetUtils.processPartitionKey(serializePartitionKey(new
HashMap<>()), new byte[0], CONVERTER, connectorPartitions);
- // Expect no partition to be added to the map since the partition
key is of an invalid format
- assertEquals(0, connectorPartitions.size());
- assertEquals(1, logCaptureAppender.getMessages().size());
- assertThat(logCaptureAppender.getMessages().get(0),
- containsString("Ignoring offset partition key with an
unexpected format"));
- }
+ assertInvalidPartitionKey(
+ new byte[]{},
+ "Ignoring offset partition key with an unexpected format");
+ assertInvalidPartitionKey(
+ serializePartitionKey(new HashMap<>()),
+ "Ignoring offset partition key with an unexpected format");
}
@Test
public void testProcessPartitionKeyListWithOneElement() {
- try (LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
- Map<String, Set<Map<String, Object>>> connectorPartitions = new
HashMap<>();
-
OffsetUtils.processPartitionKey(serializePartitionKey(Collections.singletonList("")),
new byte[0], CONVERTER, connectorPartitions);
- // Expect no partition to be added to the map since the partition
key is of an invalid format
- assertEquals(0, connectorPartitions.size());
- assertEquals(1, logCaptureAppender.getMessages().size());
- assertThat(logCaptureAppender.getMessages().get(0),
- containsString("Ignoring offset partition key with an
unexpected number of elements"));
- }
+ assertInvalidPartitionKey(
+ serializePartitionKey(Collections.singletonList("")),
+ "Ignoring offset partition key with an unexpected number of
elements");
}
@Test
public void testProcessPartitionKeyListWithElementsOfWrongType() {
+ assertInvalidPartitionKey(
+ serializePartitionKey(Arrays.asList(1, new HashMap<>())),
+ "Ignoring offset partition key with an unexpected format for
the first element in the partition key list");
+ assertInvalidPartitionKey(
+ serializePartitionKey(Arrays.asList("connector-name", new
ArrayList<>())),
+ "Ignoring offset partition key with an unexpected format for
the second element in the partition key list");
+ }
+
+ public void assertInvalidPartitionKey(byte[] key, String message) {
try (LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
Map<String, Set<Map<String, Object>>> connectorPartitions = new
HashMap<>();
-
OffsetUtils.processPartitionKey(serializePartitionKey(Arrays.asList(1, new
HashMap<>())), new byte[0], CONVERTER, connectorPartitions);
+ OffsetUtils.processPartitionKey(key, new byte[0], CONVERTER,
connectorPartitions);
// Expect no partition to be added to the map since the partition
key is of an invalid format
assertEquals(0, connectorPartitions.size());
assertEquals(1, logCaptureAppender.getMessages().size());
- assertThat(logCaptureAppender.getMessages().get(0),
- containsString("Ignoring offset partition key with an
unexpected format for the first element in the partition key list"));
-
-
OffsetUtils.processPartitionKey(serializePartitionKey(Arrays.asList("connector-name",
new ArrayList<>())), new byte[0], CONVERTER, connectorPartitions);
- // Expect no partition to be added to the map since the partition
key is of an invalid format
- assertEquals(0, connectorPartitions.size());
- assertEquals(2, logCaptureAppender.getMessages().size());
- assertThat(logCaptureAppender.getMessages().get(1),
- containsString("Ignoring offset partition key with an
unexpected format for the second element in the partition key list"));
+ assertThat(logCaptureAppender.getMessages().get(0),
containsString(message));
}
}