This is an automated email from the ASF dual-hosted git repository.
gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 99e511c706b KAFKA-16288, KAFKA-16289: Fix Values convertToDecimal
exception and parseString corruption (#15399)
99e511c706b is described below
commit 99e511c706b7da08b559a3ff6a2c207cacd47b86
Author: Greg Harris <[email protected]>
AuthorDate: Mon Mar 4 15:59:47 2024 -0800
KAFKA-16288, KAFKA-16289: Fix Values convertToDecimal exception and
parseString corruption (#15399)
* KAFKA-16288: Prevent ClassCastExceptions for strings in
Values.convertToDecimal
* KAFKA-16289: Values inferred schemas for map and arrays should ignore
element order
Signed-off-by: Greg Harris <[email protected]>
Reviewers: Chris Egerton <[email protected]>
---
.../java/org/apache/kafka/connect/data/Values.java | 20 +++-
.../org/apache/kafka/connect/data/ValuesTest.java | 110 +++++++++++++++++----
2 files changed, 108 insertions(+), 22 deletions(-)
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
index cbaeeae66fb..7b78c64af0c 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -424,7 +424,7 @@ public class Values {
return BigDecimal.valueOf(converted);
}
if (value instanceof String) {
- return new BigDecimal(value.toString()).doubleValue();
+ return new BigDecimal(value.toString());
}
}
if (value instanceof ByteBuffer) {
@@ -802,11 +802,12 @@ public class Values {
try {
if (parser.canConsume(ARRAY_BEGIN_DELIMITER)) {
List<Object> result = new ArrayList<>();
+ boolean compatible = true;
Schema elementSchema = null;
while (parser.hasNext()) {
if (parser.canConsume(ARRAY_END_DELIMITER)) {
Schema listSchema;
- if (elementSchema != null) {
+ if (elementSchema != null && compatible) {
listSchema =
SchemaBuilder.array(elementSchema).schema();
result = alignListEntriesWithSchema(listSchema,
result);
} else {
@@ -821,6 +822,9 @@ public class Values {
}
SchemaAndValue element = parse(parser, true);
elementSchema = commonSchemaFor(elementSchema, element);
+ if (elementSchema == null && element != null &&
element.schema() != null) {
+ compatible = false;
+ }
result.add(element != null ? element.value() : null);
int currentPosition = parser.mark();
@@ -840,15 +844,17 @@ public class Values {
if (parser.canConsume(MAP_BEGIN_DELIMITER)) {
Map<Object, Object> result = new LinkedHashMap<>();
+ boolean keyCompatible = true;
Schema keySchema = null;
+ boolean valueCompatible = true;
Schema valueSchema = null;
while (parser.hasNext()) {
if (parser.canConsume(MAP_END_DELIMITER)) {
Schema mapSchema;
- if (keySchema != null && valueSchema != null) {
+ if (keySchema != null && valueSchema != null &&
keyCompatible && valueCompatible) {
mapSchema = SchemaBuilder.map(keySchema,
valueSchema).build();
result =
alignMapKeysAndValuesWithSchema(mapSchema, result);
- } else if (keySchema != null) {
+ } else if (keySchema != null && keyCompatible) {
mapSchema =
SchemaBuilder.mapWithNullValues(keySchema);
result = alignMapKeysWithSchema(mapSchema, result);
} else {
@@ -876,7 +882,13 @@ public class Values {
parser.canConsume(COMMA_DELIMITER);
keySchema = commonSchemaFor(keySchema, key);
+ if (keySchema == null && key.schema() != null) {
+ keyCompatible = false;
+ }
valueSchema = commonSchemaFor(valueSchema, value);
+ if (valueSchema == null && value != null && value.schema()
!= null) {
+ valueCompatible = false;
+ }
}
// Missing either a comma or an end delimiter
if (COMMA_DELIMITER.equals(parser.previous())) {
diff --git
a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
index 3700a6ee4e6..abb6ea42214 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Timeout;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -363,33 +364,73 @@ public class ValuesTest {
}
/**
- * The parsed array has byte values and one int value, so we should return
list with single unified type of integers.
+ * We parse into different element types, but cannot infer a common
element schema.
+ * This behavior should be independent of the order that the elements
appear in the string
*/
@Test
- public void
shouldConvertStringOfListWithMixedElementTypesIntoListWithDifferentElementTypes()
{
- String str = "[1, 2, \"three\"]";
- List<?> list = Values.convertToList(Schema.STRING_SCHEMA, str);
- assertEquals(3, list.size());
- assertEquals(1, ((Number) list.get(0)).intValue());
- assertEquals(2, ((Number) list.get(1)).intValue());
- assertEquals("three", list.get(2));
+ public void shouldParseStringListWithMultipleElementTypes() {
+ assertParseStringArrayWithNoSchema(
+ Arrays.asList((byte) 1, (byte) 2, (short) 300, "four"),
+ "[1, 2, 300, \"four\"]");
+ assertParseStringArrayWithNoSchema(
+ Arrays.asList((byte) 2, (short) 300, "four", (byte) 1),
+ "[2, 300, \"four\", 1]");
+ assertParseStringArrayWithNoSchema(
+ Arrays.asList((short) 300, "four", (byte) 1, (byte) 2),
+ "[300, \"four\", 1, 2]");
+ assertParseStringArrayWithNoSchema(
+ Arrays.asList("four", (byte) 1, (byte) 2, (short) 300),
+ "[\"four\", 1, 2, 300]");
+ }
+
+ private void assertParseStringArrayWithNoSchema(List<Object> expected,
String str) {
+ SchemaAndValue result = Values.parseString(str);
+ assertEquals(Type.ARRAY, result.schema().type());
+ assertNull(result.schema().valueSchema());
+ List<?> list = (List<?>) result.value();
+ assertEquals(expected, list);
}
/**
- * We parse into different element types, but cannot infer a common
element schema.
+ * Maps with an inconsistent key type don't find a common type for the
keys or the values
+ * This behavior should be independent of the order that the pairs appear
in the string
+ */
+ @Test
+ public void shouldParseStringMapWithMultipleKeyTypes() {
+ Map<Object, Object> expected = new HashMap<>();
+ expected.put((byte) 1, (byte) 1);
+ expected.put((byte) 2, (byte) 1);
+ expected.put((short) 300, (short) 300);
+ expected.put("four", (byte) 1);
+ assertParseStringMapWithNoSchema(expected, "{1:1, 2:1, 300:300,
\"four\":1}");
+ assertParseStringMapWithNoSchema(expected, "{2:1, 300:300, \"four\":1,
1:1}");
+ assertParseStringMapWithNoSchema(expected, "{300:300, \"four\":1, 1:1,
2:1}");
+ assertParseStringMapWithNoSchema(expected, "{\"four\":1, 1:1, 2:1,
300:300}");
+ }
+
+ /**
+ * Maps with a consistent key type may still not have a common type for
the values
+ * This behavior should be independent of the order that the pairs appear
in the string
*/
@Test
- public void
shouldParseStringListWithMultipleElementTypesAndReturnListWithNoSchema() {
- String str = "[1, 2, 3, \"four\"]";
+ public void shouldParseStringMapWithMultipleValueTypes() {
+ Map<Object, Object> expected = new HashMap<>();
+ expected.put((short) 1, (byte) 1);
+ expected.put((short) 2, (byte) 1);
+ expected.put((short) 300, (short) 300);
+ expected.put((short) 4, "four");
+ assertParseStringMapWithNoSchema(expected, "{1:1, 2:1, 300:300,
4:\"four\"}");
+ assertParseStringMapWithNoSchema(expected, "{2:1, 300:300, 4:\"four\",
1:1}");
+ assertParseStringMapWithNoSchema(expected, "{300:300, 4:\"four\", 1:1,
2:1}");
+ assertParseStringMapWithNoSchema(expected, "{4:\"four\", 1:1, 2:1,
300:300}");
+ }
+
+ private void assertParseStringMapWithNoSchema(Map<Object, Object>
expected, String str) {
SchemaAndValue result = Values.parseString(str);
- assertEquals(Type.ARRAY, result.schema().type());
+ assertEquals(Type.MAP, result.schema().type());
assertNull(result.schema().valueSchema());
- List<?> list = (List<?>) result.value();
- assertEquals(4, list.size());
- assertEquals(1, ((Number) list.get(0)).intValue());
- assertEquals(2, ((Number) list.get(1)).intValue());
- assertEquals(3, ((Number) list.get(2)).intValue());
- assertEquals("four", list.get(3));
+ Map<?, ?> list = (Map<?, ?>) result.value();
+ assertEquals(expected, list);
}
/**
@@ -744,6 +785,39 @@ public class ValuesTest {
assertEquals(current, ts4);
}
+ @Test
+ public void shouldConvertDecimalValues() {
+ // Various forms of the same number should all be parsed to the same
BigDecimal
+ Number number = 1.0f;
+ String string = number.toString();
+ BigDecimal value = new BigDecimal(string);
+ byte[] bytes = Decimal.fromLogical(Decimal.schema(1), value);
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+
+ assertEquals(value, Values.convertToDecimal(null, number, 1));
+ assertEquals(value, Values.convertToDecimal(null, string, 1));
+ assertEquals(value, Values.convertToDecimal(null, value, 1));
+ assertEquals(value, Values.convertToDecimal(null, bytes, 1));
+ assertEquals(value, Values.convertToDecimal(null, buffer, 1));
+ }
+
+ /**
+ * Test parsing distinct number-like types (strings containing numbers,
and logical Decimals) in the same list
+ * The parser does not convert Numbers to Decimals, or Strings containing
numbers to Numbers automatically.
+ */
+ @Test
+ public void shouldNotConvertArrayValuesToDecimal() {
+ List<Object> decimals = Arrays.asList("\"1.0\"",
BigDecimal.valueOf(Long.MAX_VALUE).add(BigDecimal.ONE),
+ BigDecimal.valueOf(Long.MIN_VALUE).subtract(BigDecimal.ONE),
(byte) 1, (byte) 1);
+ List<Object> expected = new ArrayList<>(decimals); // most values are
directly reproduced with the same type
+ expected.set(0, "1.0"); // The quotes are parsed away, but the value
remains a string
+ SchemaAndValue schemaAndValue =
Values.parseString(decimals.toString());
+ Schema schema = schemaAndValue.schema();
+ assertEquals(Type.ARRAY, schema.type());
+ assertNull(schema.valueSchema());
+ assertEquals(expected, schemaAndValue.value());
+ }
+
@Test
public void canConsume() {
}