This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 4d36a9e634170e968f37f650607802794eefc7ba Author: Chris Egerton <[email protected]> AuthorDate: Tue Jan 21 13:33:50 2020 -0600 KAFKA-9083: Various fixes/improvements for Connect's Values class (#7593) Author: Chris Egerton <[email protected]> Reviewers: Greg Harris <[email protected]>, Randall Hauch <[email protected]> --- .../apache/kafka/connect/data/SchemaBuilder.java | 20 +++ .../java/org/apache/kafka/connect/data/Values.java | 108 +++++++++---- .../org/apache/kafka/connect/data/ValuesTest.java | 167 +++++++++++++++++++-- .../connect/storage/SimpleHeaderConverterTest.java | 30 +++- 4 files changed, 278 insertions(+), 47 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java index fdcf05a..722f5fc 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java @@ -382,6 +382,26 @@ public class SchemaBuilder implements Schema { return builder; } + static SchemaBuilder arrayOfNull() { + return new SchemaBuilder(Type.ARRAY); + } + + static SchemaBuilder mapOfNull() { + return new SchemaBuilder(Type.MAP); + } + + static SchemaBuilder mapWithNullKeys(Schema valueSchema) { + SchemaBuilder result = new SchemaBuilder(Type.MAP); + result.valueSchema = valueSchema; + return result; + } + + static SchemaBuilder mapWithNullValues(Schema keySchema) { + SchemaBuilder result = new SchemaBuilder(Type.MAP); + result.keySchema = keySchema; + return result; + } + @Override public Schema keySchema() { return keySchema; diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java index c1bebdf..93c320a 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java @@ -723,14 +723,30 @@ public class Values { return new SimpleDateFormat(ISO_8601_TIMESTAMP_FORMAT_PATTERN); } + protected static boolean canParseSingleTokenLiteral(Parser parser, boolean embedded, String tokenLiteral) { + int startPosition = parser.mark(); + // If the next token is what we expect, then either... + if (parser.canConsume(tokenLiteral)) { + // ...we're reading an embedded value, in which case the next token will be handled appropriately + // by the caller if it's something like an end delimiter for a map or array, or a comma to + // separate multiple embedded values... + // ...or it's being parsed as part of a top-level string, in which case, any other tokens should + // cause use to stop parsing this single-token literal as such and instead just treat it like + // a string. For example, the top-level string "true}" will be tokenized as the tokens "true" and + // "}", but should ultimately be parsed as just the string "true}" instead of the boolean true. + if (embedded || !parser.hasNext()) { + return true; + } + } + parser.rewindTo(startPosition); + return false; + } + protected static SchemaAndValue parse(Parser parser, boolean embedded) throws NoSuchElementException { if (!parser.hasNext()) { return null; } if (embedded) { - if (parser.canConsume(NULL_VALUE)) { - return null; - } if (parser.canConsume(QUOTE_DELIMITER)) { StringBuilder sb = new StringBuilder(); while (parser.hasNext()) { @@ -742,34 +758,51 @@ public class Values { return new SchemaAndValue(Schema.STRING_SCHEMA, sb.toString()); } } - if (parser.canConsume(TRUE_LITERAL)) { + + if (canParseSingleTokenLiteral(parser, embedded, NULL_VALUE)) { + return null; + } + if (canParseSingleTokenLiteral(parser, embedded, TRUE_LITERAL)) { return TRUE_SCHEMA_AND_VALUE; } - if (parser.canConsume(FALSE_LITERAL)) { + if (canParseSingleTokenLiteral(parser, embedded, FALSE_LITERAL)) { return FALSE_SCHEMA_AND_VALUE; } + int startPosition = parser.mark(); + try { if (parser.canConsume(ARRAY_BEGIN_DELIMITER)) { List<Object> result = new ArrayList<>(); Schema elementSchema = null; while (parser.hasNext()) { if (parser.canConsume(ARRAY_END_DELIMITER)) { - Schema listSchema = null; + Schema listSchema; if (elementSchema != null) { listSchema = SchemaBuilder.array(elementSchema).schema(); + result = alignListEntriesWithSchema(listSchema, result); + } else { + // Every value is null + listSchema = SchemaBuilder.arrayOfNull().build(); } - result = alignListEntriesWithSchema(listSchema, result); return new SchemaAndValue(listSchema, result); } + if (parser.canConsume(COMMA_DELIMITER)) { throw new DataException("Unable to parse an empty array element: " + parser.original()); } SchemaAndValue element = parse(parser, true); elementSchema = commonSchemaFor(elementSchema, element); - result.add(element.value()); - parser.canConsume(COMMA_DELIMITER); + result.add(element != null ? element.value() : null); + + int currentPosition = parser.mark(); + if (parser.canConsume(ARRAY_END_DELIMITER)) { + parser.rewindTo(currentPosition); + } else if (!parser.canConsume(COMMA_DELIMITER)) { + throw new DataException("Array elements missing '" + COMMA_DELIMITER + "' delimiter"); + } } + // Missing either a comma or an end delimiter if (COMMA_DELIMITER.equals(parser.previous())) { throw new DataException("Array is missing element after ',': " + parser.original()); @@ -783,26 +816,34 @@ public class Values { Schema valueSchema = null; while (parser.hasNext()) { if (parser.canConsume(MAP_END_DELIMITER)) { - Schema mapSchema = null; + Schema mapSchema; if (keySchema != null && valueSchema != null) { - mapSchema = SchemaBuilder.map(keySchema, valueSchema).schema(); + mapSchema = SchemaBuilder.map(keySchema, valueSchema).build(); + result = alignMapKeysAndValuesWithSchema(mapSchema, result); + } else if (keySchema != null) { + mapSchema = SchemaBuilder.mapWithNullValues(keySchema); + result = alignMapKeysWithSchema(mapSchema, result); + } else { + mapSchema = SchemaBuilder.mapOfNull().build(); } - result = alignMapKeysAndValuesWithSchema(mapSchema, result); return new SchemaAndValue(mapSchema, result); } + if (parser.canConsume(COMMA_DELIMITER)) { - throw new DataException("Unable to parse a map entry has no key or value: " + parser.original()); + throw new DataException("Unable to parse a map entry with no key or value: " + parser.original()); } SchemaAndValue key = parse(parser, true); if (key == null || key.value() == null) { throw new DataException("Map entry may not have a null key: " + parser.original()); } + if (!parser.canConsume(ENTRY_DELIMITER)) { throw new DataException("Map entry is missing '=': " + parser.original()); } SchemaAndValue value = parse(parser, true); Object entryValue = value != null ? value.value() : null; result.put(key.value(), entryValue); + parser.canConsume(COMMA_DELIMITER); keySchema = commonSchemaFor(keySchema, key); valueSchema = commonSchemaFor(valueSchema, value); @@ -811,14 +852,19 @@ public class Values { if (COMMA_DELIMITER.equals(parser.previous())) { throw new DataException("Map is missing element after ',': " + parser.original()); } - throw new DataException("Map is missing terminating ']': " + parser.original()); + throw new DataException("Map is missing terminating '}': " + parser.original()); } } catch (DataException e) { - LOG.debug("Unable to parse the value as a map; reverting to string", e); + LOG.debug("Unable to parse the value as a map or an array; reverting to string", e); parser.rewindTo(startPosition); } - String token = parser.next().trim(); - assert !token.isEmpty(); // original can be empty string but is handled right away; no way for token to be empty here + + String token = parser.next(); + if (token.trim().isEmpty()) { + return new SchemaAndValue(Schema.STRING_SCHEMA, token); + } + token = token.trim(); + char firstChar = token.charAt(0); boolean firstCharIsDigit = Character.isDigit(firstChar); if (firstCharIsDigit || firstChar == '+' || firstChar == '-') { @@ -878,6 +924,9 @@ public class Values { } } } + if (embedded) { + throw new DataException("Failed to parse embedded value"); + } // At this point, the only thing this can be is a string. Embedded strings were processed above, // so this is not embedded and we can use the original string... return new SchemaAndValue(Schema.STRING_SCHEMA, parser.original()); @@ -953,9 +1002,6 @@ public class Values { } protected static List<Object> alignListEntriesWithSchema(Schema schema, List<Object> input) { - if (schema == null) { - return input; - } Schema valueSchema = schema.valueSchema(); List<Object> result = new ArrayList<>(); for (Object value : input) { @@ -966,9 +1012,6 @@ public class Values { } protected static Map<Object, Object> alignMapKeysAndValuesWithSchema(Schema mapSchema, Map<Object, Object> input) { - if (mapSchema == null) { - return input; - } Schema keySchema = mapSchema.keySchema(); Schema valueSchema = mapSchema.valueSchema(); Map<Object, Object> result = new LinkedHashMap<>(); @@ -980,6 +1023,16 @@ public class Values { return result; } + protected static Map<Object, Object> alignMapKeysWithSchema(Schema mapSchema, Map<Object, Object> input) { + Schema keySchema = mapSchema.keySchema(); + Map<Object, Object> result = new LinkedHashMap<>(); + for (Map.Entry<?, ?> entry : input.entrySet()) { + Object newKey = convertTo(keySchema, null, entry.getKey()); + result.put(newKey, entry.getValue()); + } + return result; + } + protected static class SchemaDetector { private Type knownType = null; private boolean optional = false; @@ -1123,12 +1176,13 @@ public class Values { nextToken = consumeNextToken(); } if (ignoreLeadingAndTrailingWhitespace) { - nextToken = nextToken.trim(); - while (nextToken.isEmpty() && canConsumeNextToken()) { - nextToken = consumeNextToken().trim(); + while (nextToken.trim().isEmpty() && canConsumeNextToken()) { + nextToken = consumeNextToken(); } } - return nextToken.equals(expected); + return ignoreLeadingAndTrailingWhitespace + ? nextToken.trim().equals(expected) + : nextToken.equals(expected); } } } diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java index 162222d..a5909f3 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java @@ -22,6 +22,9 @@ import org.apache.kafka.connect.errors.DataException; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -35,6 +38,8 @@ import static org.junit.Assert.fail; public class ValuesTest { + private static final String WHITESPACE = "\n \t \t\n"; + private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000; private static final Map<String, String> STRING_MAP = new LinkedHashMap<>(); @@ -68,6 +73,149 @@ public class ValuesTest { } @Test + public void shouldNotParseUnquotedEmbeddedMapKeysAsStrings() { + SchemaAndValue schemaAndValue = Values.parseString("{foo: 3}"); + assertEquals(Type.STRING, schemaAndValue.schema().type()); + assertEquals("{foo: 3}", schemaAndValue.value()); + } + + @Test + public void shouldNotParseUnquotedEmbeddedMapValuesAsStrings() { + SchemaAndValue schemaAndValue = Values.parseString("{3: foo}"); + assertEquals(Type.STRING, schemaAndValue.schema().type()); + assertEquals("{3: foo}", schemaAndValue.value()); + } + + @Test + public void shouldNotParseUnquotedArrayElementsAsStrings() { + SchemaAndValue schemaAndValue = Values.parseString("[foo]"); + assertEquals(Type.STRING, schemaAndValue.schema().type()); + assertEquals("[foo]", schemaAndValue.value()); + } + + @Test + public void shouldNotParseStringsBeginningWithNullAsStrings() { + SchemaAndValue schemaAndValue = Values.parseString("null="); + assertEquals(Type.STRING, schemaAndValue.schema().type()); + assertEquals("null=", schemaAndValue.value()); + } + + @Test + public void shouldParseStringsBeginningWithTrueAsStrings() { + SchemaAndValue schemaAndValue = Values.parseString("true}"); + assertEquals(Type.STRING, schemaAndValue.schema().type()); + assertEquals("true}", schemaAndValue.value()); + } + + @Test + public void shouldParseStringsBeginningWithFalseAsStrings() { + SchemaAndValue schemaAndValue = Values.parseString("false]"); + assertEquals(Type.STRING, schemaAndValue.schema().type()); + assertEquals("false]", schemaAndValue.value()); + } + + @Test + public void shouldParseTrueAsBooleanIfSurroundedByWhitespace() { + SchemaAndValue schemaAndValue = Values.parseString(WHITESPACE + "true" + WHITESPACE); + assertEquals(Type.BOOLEAN, schemaAndValue.schema().type()); + assertEquals(true, schemaAndValue.value()); + } + + @Test + public void shouldParseFalseAsBooleanIfSurroundedByWhitespace() { + SchemaAndValue schemaAndValue = Values.parseString(WHITESPACE + "false" + WHITESPACE); + assertEquals(Type.BOOLEAN, schemaAndValue.schema().type()); + assertEquals(false, schemaAndValue.value()); + } + + @Test + public void shouldParseNullAsNullIfSurroundedByWhitespace() { + SchemaAndValue schemaAndValue = Values.parseString(WHITESPACE + "null" + WHITESPACE); + assertNull(schemaAndValue); + } + + @Test + public void shouldParseBooleanLiteralsEmbeddedInArray() { + SchemaAndValue schemaAndValue = Values.parseString("[true, false]"); + assertEquals(Type.ARRAY, schemaAndValue.schema().type()); + assertEquals(Type.BOOLEAN, schemaAndValue.schema().valueSchema().type()); + assertEquals(Arrays.asList(true, false), schemaAndValue.value()); + } + + @Test + public void shouldParseBooleanLiteralsEmbeddedInMap() { + SchemaAndValue schemaAndValue = Values.parseString("{true: false, false: true}"); + assertEquals(Type.MAP, schemaAndValue.schema().type()); + assertEquals(Type.BOOLEAN, schemaAndValue.schema().keySchema().type()); + assertEquals(Type.BOOLEAN, schemaAndValue.schema().valueSchema().type()); + Map<Boolean, Boolean> expectedValue = new HashMap<>(); + expectedValue.put(true, false); + expectedValue.put(false, true); + assertEquals(expectedValue, schemaAndValue.value()); + } + + @Test + public void shouldNotParseAsMapWithoutCommas() { + SchemaAndValue schemaAndValue = Values.parseString("{6:9 4:20}"); + assertEquals(Type.STRING, schemaAndValue.schema().type()); + assertEquals("{6:9 4:20}", schemaAndValue.value()); + } + + @Test + public void shouldNotParseAsArrayWithoutCommas() { + SchemaAndValue schemaAndValue = Values.parseString("[0 1 2]"); + assertEquals(Type.STRING, schemaAndValue.schema().type()); + assertEquals("[0 1 2]", schemaAndValue.value()); + } + + @Test + public void shouldParseEmptyMap() { + SchemaAndValue schemaAndValue = Values.parseString("{}"); + assertEquals(Type.MAP, schemaAndValue.schema().type()); + assertEquals(Collections.emptyMap(), schemaAndValue.value()); + } + + @Test + public void shouldParseEmptyArray() { + SchemaAndValue schemaAndValue = Values.parseString("[]"); + assertEquals(Type.ARRAY, schemaAndValue.schema().type()); + assertEquals(Collections.emptyList(), schemaAndValue.value()); + } + + @Test + public void shouldNotParseAsMapWithNullKeys() { + SchemaAndValue schemaAndValue = Values.parseString("{null: 3}"); + assertEquals(Type.STRING, schemaAndValue.schema().type()); + assertEquals("{null: 3}", schemaAndValue.value()); + } + + @Test + public void shouldParseNull() { + SchemaAndValue schemaAndValue = Values.parseString("null"); + assertNull(schemaAndValue); + } + + @Test + public void shouldConvertStringOfNull() { + assertRoundTrip(Schema.STRING_SCHEMA, "null"); + } + + @Test + public void shouldParseNullMapValues() { + SchemaAndValue schemaAndValue = Values.parseString("{3: null}"); + assertEquals(Type.MAP, schemaAndValue.schema().type()); + assertEquals(Type.INT8, schemaAndValue.schema().keySchema().type()); + assertEquals(Collections.singletonMap((byte) 3, null), schemaAndValue.value()); + } + + @Test + public void shouldParseNullArrayElements() { + SchemaAndValue schemaAndValue = Values.parseString("[null]"); + assertEquals(Type.ARRAY, schemaAndValue.schema().type()); + assertEquals(Collections.singletonList(null), schemaAndValue.value()); + } + + @Test public void shouldEscapeStringsWithEmbeddedQuotesAndBackslashes() { String original = "three\"blind\\\"mice"; String expected = "three\\\"blind\\\\\\\"mice"; @@ -214,7 +362,8 @@ public class ValuesTest { public void shouldParseStringListWithMultipleElementTypesAndReturnListWithNoSchema() { String str = "[1, 2, 3, \"four\"]"; SchemaAndValue result = Values.parseString(str); - assertNull(result.schema()); + assertEquals(Type.ARRAY, result.schema().type()); + assertNull(result.schema().valueSchema()); List<?> list = (List<?>) result.value(); assertEquals(4, list.size()); assertEquals(1, ((Number) list.get(0)).intValue()); @@ -256,7 +405,7 @@ public class ValuesTest { */ @Test(expected = DataException.class) public void shouldFailToParseStringOfMapWithIntValuesWithBlankEntry() { - Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" : 1234567890 ,, \"bar\" : 0, \"baz\" : -987654321 } "); + Values.convertToMap(Schema.STRING_SCHEMA, " { \"foo\" : 1234567890 ,, \"bar\" : 0, \"baz\" : -987654321 } "); } /** @@ -264,7 +413,7 @@ public class ValuesTest { */ @Test(expected = DataException.class) public void shouldFailToParseStringOfMalformedMap() { - Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" : 1234567890 , \"a\", \"bar\" : 0, \"baz\" : -987654321 } "); + Values.convertToMap(Schema.STRING_SCHEMA, " { \"foo\" : 1234567890 , \"a\", \"bar\" : 0, \"baz\" : -987654321 } "); } /** @@ -272,7 +421,7 @@ public class ValuesTest { */ @Test(expected = DataException.class) public void shouldFailToParseStringOfMapWithIntValuesWithOnlyBlankEntries() { - Values.convertToList(Schema.STRING_SCHEMA, " { ,, , , } "); + Values.convertToMap(Schema.STRING_SCHEMA, " { ,, , , } "); } /** @@ -280,15 +429,7 @@ public class ValuesTest { */ @Test(expected = DataException.class) public void shouldFailToParseStringOfMapWithIntValuesWithBlankEntries() { - Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" : \"1234567890\" ,, \"bar\" : \"0\", \"baz\" : \"boz\" } "); - } - - /** - * Schema for Map requires a schema for key and value, but we have no key or value and Connect has no "any" type - */ - @Test(expected = DataException.class) - public void shouldFailToParseStringOfEmptyMap() { - Values.convertToList(Schema.STRING_SCHEMA, " { } "); + Values.convertToMap(Schema.STRING_SCHEMA, " { \"foo\" : \"1234567890\" ,, \"bar\" : \"0\", \"baz\" : \"boz\" } "); } @Test diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java index fdfdd32..58a17f5 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java @@ -164,11 +164,15 @@ public class SimpleHeaderConverterTest { } @Test - public void shouldConvertMapWithStringKeysAndMixedValuesToMapWithoutSchema() { + public void shouldConvertMapWithStringKeysAndMixedValuesToMap() { Map<String, Object> map = new LinkedHashMap<>(); map.put("foo", "bar"); map.put("baz", (short) 3456); - assertRoundTrip(null, map); + SchemaAndValue result = roundTrip(null, map); + assertEquals(Schema.Type.MAP, result.schema().type()); + assertEquals(Schema.Type.STRING, result.schema().keySchema().type()); + assertNull(result.schema().valueSchema()); + assertEquals(map, result.value()); } @Test @@ -176,17 +180,29 @@ public class SimpleHeaderConverterTest { List<Object> list = new ArrayList<>(); list.add("foo"); list.add((short) 13344); - assertRoundTrip(null, list); + SchemaAndValue result = roundTrip(null, list); + assertEquals(Schema.Type.ARRAY, result.schema().type()); + assertNull(result.schema().valueSchema()); + assertEquals(list, result.value()); } @Test - public void shouldConvertEmptyMapToMapWithoutSchema() { - assertRoundTrip(null, new LinkedHashMap<>()); + public void shouldConvertEmptyMapToMap() { + Map<Object, Object> map = new LinkedHashMap<>(); + SchemaAndValue result = roundTrip(null, map); + assertEquals(Schema.Type.MAP, result.schema().type()); + assertNull(result.schema().keySchema()); + assertNull(result.schema().valueSchema()); + assertEquals(map, result.value()); } @Test - public void shouldConvertEmptyListToListWithoutSchema() { - assertRoundTrip(null, new ArrayList<>()); + public void shouldConvertEmptyListToList() { + List<Object> list = new ArrayList<>(); + SchemaAndValue result = roundTrip(null, list); + assertEquals(Schema.Type.ARRAY, result.schema().type()); + assertNull(result.schema().valueSchema()); + assertEquals(list, result.value()); } protected SchemaAndValue roundTrip(Schema schema, Object input) {
