This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 99e511c706b KAFKA-16288, KAFKA-16289: Fix Values convertToDecimal 
exception and parseString corruption (#15399)
99e511c706b is described below

commit 99e511c706b7da08b559a3ff6a2c207cacd47b86
Author: Greg Harris <[email protected]>
AuthorDate: Mon Mar 4 15:59:47 2024 -0800

    KAFKA-16288, KAFKA-16289: Fix Values convertToDecimal exception and 
parseString corruption (#15399)
    
    * KAFKA-16288: Prevent ClassCastExceptions for strings in 
Values.convertToDecimal
    * KAFKA-16289: Values inferred schemas for map and arrays should ignore 
element order
    
    Signed-off-by: Greg Harris <[email protected]>
    Reviewers: Chris Egerton <[email protected]>
---
 .../java/org/apache/kafka/connect/data/Values.java |  20 +++-
 .../org/apache/kafka/connect/data/ValuesTest.java  | 110 +++++++++++++++++----
 2 files changed, 108 insertions(+), 22 deletions(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
index cbaeeae66fb..7b78c64af0c 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -424,7 +424,7 @@ public class Values {
                         return BigDecimal.valueOf(converted);
                     }
                     if (value instanceof String) {
-                        return new BigDecimal(value.toString()).doubleValue();
+                        return new BigDecimal(value.toString());
                     }
                 }
                 if (value instanceof ByteBuffer) {
@@ -802,11 +802,12 @@ public class Values {
         try {
             if (parser.canConsume(ARRAY_BEGIN_DELIMITER)) {
                 List<Object> result = new ArrayList<>();
+                boolean compatible = true;
                 Schema elementSchema = null;
                 while (parser.hasNext()) {
                     if (parser.canConsume(ARRAY_END_DELIMITER)) {
                         Schema listSchema;
-                        if (elementSchema != null) {
+                        if (elementSchema != null && compatible) {
                             listSchema = 
SchemaBuilder.array(elementSchema).schema();
                             result = alignListEntriesWithSchema(listSchema, 
result);
                         } else {
@@ -821,6 +822,9 @@ public class Values {
                     }
                     SchemaAndValue element = parse(parser, true);
                     elementSchema = commonSchemaFor(elementSchema, element);
+                    if (elementSchema == null && element != null && 
element.schema() != null) {
+                        compatible = false;
+                    }
                     result.add(element != null ? element.value() : null);
 
                     int currentPosition = parser.mark();
@@ -840,15 +844,17 @@ public class Values {
 
             if (parser.canConsume(MAP_BEGIN_DELIMITER)) {
                 Map<Object, Object> result = new LinkedHashMap<>();
+                boolean keyCompatible = true;
                 Schema keySchema = null;
+                boolean valueCompatible = true;
                 Schema valueSchema = null;
                 while (parser.hasNext()) {
                     if (parser.canConsume(MAP_END_DELIMITER)) {
                         Schema mapSchema;
-                        if (keySchema != null && valueSchema != null) {
+                        if (keySchema != null && valueSchema != null && 
keyCompatible && valueCompatible) {
                             mapSchema = SchemaBuilder.map(keySchema, 
valueSchema).build();
                             result = 
alignMapKeysAndValuesWithSchema(mapSchema, result);
-                        } else if (keySchema != null) {
+                        } else if (keySchema != null && keyCompatible) {
                             mapSchema = 
SchemaBuilder.mapWithNullValues(keySchema);
                             result = alignMapKeysWithSchema(mapSchema, result);
                         } else {
@@ -876,7 +882,13 @@ public class Values {
 
                     parser.canConsume(COMMA_DELIMITER);
                     keySchema = commonSchemaFor(keySchema, key);
+                    if (keySchema == null && key.schema() != null) {
+                        keyCompatible = false;
+                    }
                     valueSchema = commonSchemaFor(valueSchema, value);
+                    if (valueSchema == null && value != null && value.schema() 
!= null) {
+                        valueCompatible = false;
+                    }
                 }
                 // Missing either a comma or an end delimiter
                 if (COMMA_DELIMITER.equals(parser.previous())) {
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java 
b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
index 3700a6ee4e6..abb6ea42214 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Timeout;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -363,33 +364,73 @@ public class ValuesTest {
     }
 
     /**
-     * The parsed array has byte values and one int value, so we should return 
list with single unified type of integers.
+     * We parse into different element types, but cannot infer a common 
element schema.
+     * This behavior should be independent of the order that the elements 
appear in the string
      */
     @Test
-    public void 
shouldConvertStringOfListWithMixedElementTypesIntoListWithDifferentElementTypes()
 {
-        String str = "[1, 2, \"three\"]";
-        List<?> list = Values.convertToList(Schema.STRING_SCHEMA, str);
-        assertEquals(3, list.size());
-        assertEquals(1, ((Number) list.get(0)).intValue());
-        assertEquals(2, ((Number) list.get(1)).intValue());
-        assertEquals("three", list.get(2));
+    public void shouldParseStringListWithMultipleElementTypes() {
+        assertParseStringArrayWithNoSchema(
+                Arrays.asList((byte) 1, (byte) 2, (short) 300, "four"),
+                "[1, 2, 300, \"four\"]");
+        assertParseStringArrayWithNoSchema(
+                Arrays.asList((byte) 2, (short) 300, "four", (byte) 1),
+                "[2, 300, \"four\", 1]");
+        assertParseStringArrayWithNoSchema(
+                Arrays.asList((short) 300, "four", (byte) 1, (byte) 2),
+                "[300, \"four\", 1, 2]");
+        assertParseStringArrayWithNoSchema(
+                Arrays.asList("four", (byte) 1, (byte) 2, (short) 300),
+                "[\"four\", 1, 2, 300]");
+    }
+
+    private void assertParseStringArrayWithNoSchema(List<Object> expected, 
String str) {
+        SchemaAndValue result = Values.parseString(str);
+        assertEquals(Type.ARRAY, result.schema().type());
+        assertNull(result.schema().valueSchema());
+        List<?> list = (List<?>) result.value();
+        assertEquals(expected, list);
     }
 
     /**
-     * We parse into different element types, but cannot infer a common 
element schema.
+     * Maps with an inconsistent key type don't find a common type for the 
keys or the values
+     * This behavior should be independent of the order that the pairs appear 
in the string
+     */
+    @Test
+    public void shouldParseStringMapWithMultipleKeyTypes() {
+        Map<Object, Object> expected = new HashMap<>();
+        expected.put((byte) 1, (byte) 1);
+        expected.put((byte) 2, (byte) 1);
+        expected.put((short) 300, (short) 300);
+        expected.put("four", (byte) 1);
+        assertParseStringMapWithNoSchema(expected, "{1:1, 2:1, 300:300, 
\"four\":1}");
+        assertParseStringMapWithNoSchema(expected, "{2:1, 300:300, \"four\":1, 
1:1}");
+        assertParseStringMapWithNoSchema(expected, "{300:300, \"four\":1, 1:1, 
2:1}");
+        assertParseStringMapWithNoSchema(expected, "{\"four\":1, 1:1, 2:1, 
300:300}");
+    }
+
+    /**
+     * Maps with a consistent key type may still not have a common type for 
the values
+     * This behavior should be independent of the order that the pairs appear 
in the string
      */
     @Test
-    public void 
shouldParseStringListWithMultipleElementTypesAndReturnListWithNoSchema() {
-        String str = "[1, 2, 3, \"four\"]";
+    public void shouldParseStringMapWithMultipleValueTypes() {
+        Map<Object, Object> expected = new HashMap<>();
+        expected.put((short) 1, (byte) 1);
+        expected.put((short) 2, (byte) 1);
+        expected.put((short) 300, (short) 300);
+        expected.put((short) 4, "four");
+        assertParseStringMapWithNoSchema(expected, "{1:1, 2:1, 300:300, 
4:\"four\"}");
+        assertParseStringMapWithNoSchema(expected, "{2:1, 300:300, 4:\"four\", 
1:1}");
+        assertParseStringMapWithNoSchema(expected, "{300:300, 4:\"four\", 1:1, 
2:1}");
+        assertParseStringMapWithNoSchema(expected, "{4:\"four\", 1:1, 2:1, 
300:300}");
+    }
+
+    private void assertParseStringMapWithNoSchema(Map<Object, Object> 
expected, String str) {
         SchemaAndValue result = Values.parseString(str);
-        assertEquals(Type.ARRAY, result.schema().type());
+        assertEquals(Type.MAP, result.schema().type());
         assertNull(result.schema().valueSchema());
-        List<?> list = (List<?>) result.value();
-        assertEquals(4, list.size());
-        assertEquals(1, ((Number) list.get(0)).intValue());
-        assertEquals(2, ((Number) list.get(1)).intValue());
-        assertEquals(3, ((Number) list.get(2)).intValue());
-        assertEquals("four", list.get(3));
+        Map<?, ?> list = (Map<?, ?>) result.value();
+        assertEquals(expected, list);
     }
 
     /**
@@ -744,6 +785,39 @@ public class ValuesTest {
         assertEquals(current, ts4);
     }
 
+    @Test
+    public void shouldConvertDecimalValues() {
+        // Various forms of the same number should all be parsed to the same 
BigDecimal
+        Number number = 1.0f;
+        String string = number.toString();
+        BigDecimal value = new BigDecimal(string);
+        byte[] bytes = Decimal.fromLogical(Decimal.schema(1), value);
+        ByteBuffer buffer = ByteBuffer.wrap(bytes);
+
+        assertEquals(value, Values.convertToDecimal(null, number, 1));
+        assertEquals(value, Values.convertToDecimal(null, string, 1));
+        assertEquals(value, Values.convertToDecimal(null, value, 1));
+        assertEquals(value, Values.convertToDecimal(null, bytes, 1));
+        assertEquals(value, Values.convertToDecimal(null, buffer, 1));
+    }
+
+    /**
+     * Test parsing distinct number-like types (strings containing numbers, 
and logical Decimals) in the same list
+     * The parser does not convert Numbers to Decimals, or Strings containing 
numbers to Numbers automatically.
+     */
+    @Test
+    public void shouldNotConvertArrayValuesToDecimal() {
+        List<Object> decimals = Arrays.asList("\"1.0\"", 
BigDecimal.valueOf(Long.MAX_VALUE).add(BigDecimal.ONE),
+                BigDecimal.valueOf(Long.MIN_VALUE).subtract(BigDecimal.ONE), 
(byte) 1, (byte) 1);
+        List<Object> expected = new ArrayList<>(decimals); // most values are 
directly reproduced with the same type
+        expected.set(0, "1.0"); // The quotes are parsed away, but the value 
remains a string
+        SchemaAndValue schemaAndValue = 
Values.parseString(decimals.toString());
+        Schema schema = schemaAndValue.schema();
+        assertEquals(Type.ARRAY, schema.type());
+        assertNull(schema.valueSchema());
+        assertEquals(expected, schemaAndValue.value());
+    }
+
     @Test
     public void canConsume() {
     }

Reply via email to