This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new b847dac KAFKA-9074: Correct Connect’s `Values.parseString` to
properly parse a time and timestamp literal (#7568)
b847dac is described below
commit b847dace7d6504a0c950bf28a5c3c95202baa4c3
Author: Randall Hauch <[email protected]>
AuthorDate: Tue Feb 4 11:15:04 2020 -0600
KAFKA-9074: Correct Connect’s `Values.parseString` to properly parse a time
and timestamp literal (#7568)
* KAFKA-9074: Correct Connect’s `Values.parseString` to properly parse a
time and timestamp literal
Time and timestamp literal strings contain a `:` character, but the
internal parser used in the `Values.parseString(String)` method tokenizes on
the colon character to tokenize and parse map entries. The colon could be
escaped, but then the backslash character used to escape the colon is not
removed and the parser fails to match the literal as a time or timestamp value.
This fix corrects the parsing logic to properly parse timestamp and time
literal strings whose colon characters are either escaped or unescaped.
Additional unit tests were added to first verify the incorrect behavior and
then to validate the correction.
Author: Randall Hauch <[email protected]>
Reviewers: Chris Egerton <[email protected]>, Nigel Liang
<[email protected]>, Jason Gustafson <[email protected]>
---
.../java/org/apache/kafka/connect/data/Values.java | 133 +++++++++++++-----
.../org/apache/kafka/connect/data/ValuesTest.java | 155 +++++++++++++++++++++
2 files changed, 257 insertions(+), 31 deletions(-)
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
index 93c320a..d99fbca 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -30,13 +30,17 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.text.StringCharacterIterator;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Base64;
import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Set;
import java.util.TimeZone;
import java.util.regex.Pattern;
@@ -70,9 +74,18 @@ public class Values {
private static final String FALSE_LITERAL = Boolean.FALSE.toString();
private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
private static final String NULL_VALUE = "null";
- private static final String ISO_8601_DATE_FORMAT_PATTERN = "yyyy-MM-dd";
- private static final String ISO_8601_TIME_FORMAT_PATTERN =
"HH:mm:ss.SSS'Z'";
- private static final String ISO_8601_TIMESTAMP_FORMAT_PATTERN =
ISO_8601_DATE_FORMAT_PATTERN + "'T'" + ISO_8601_TIME_FORMAT_PATTERN;
+ static final String ISO_8601_DATE_FORMAT_PATTERN = "yyyy-MM-dd";
+ static final String ISO_8601_TIME_FORMAT_PATTERN = "HH:mm:ss.SSS'Z'";
+ static final String ISO_8601_TIMESTAMP_FORMAT_PATTERN =
ISO_8601_DATE_FORMAT_PATTERN + "'T'" + ISO_8601_TIME_FORMAT_PATTERN;
+ private static final Set<String> TEMPORAL_LOGICAL_TYPE_NAMES =
+ Collections.unmodifiableSet(
+ new HashSet<>(
+ Arrays.asList(Time.LOGICAL_NAME,
+ Timestamp.LOGICAL_NAME,
+ Date.LOGICAL_NAME
+ )
+ )
+ );
private static final String QUOTE_DELIMITER = "\"";
private static final String COMMA_DELIMITER = ",";
@@ -467,6 +480,9 @@ public class Values {
int days = (int) (millis / MILLIS_PER_DAY); //
truncates
return Date.toLogical(toSchema, days);
}
+ } else {
+ // There is no fromSchema, so no conversion is
needed
+ return value;
}
}
long numeric = asLong(value, fromSchema, null);
@@ -492,6 +508,9 @@ public class Values {
calendar.set(Calendar.DAY_OF_MONTH, 1);
return Time.toLogical(toSchema, (int)
calendar.getTimeInMillis());
}
+ } else {
+ // There is no fromSchema, so no conversion is
needed
+ return value;
}
}
long numeric = asLong(value, fromSchema, null);
@@ -523,6 +542,9 @@ public class Values {
if (Timestamp.LOGICAL_NAME.equals(fromSchemaName))
{
return value;
}
+ } else {
+ // There is no fromSchema, so no conversion is
needed
+ return value;
}
}
long numeric = asLong(value, fromSchema, null);
@@ -755,7 +777,14 @@ public class Values {
}
sb.append(parser.next());
}
- return new SchemaAndValue(Schema.STRING_SCHEMA, sb.toString());
+ String content = sb.toString();
+ // We can parse string literals as temporal logical types, but
all others
+ // are treated as strings
+ SchemaAndValue parsed = parseString(content);
+ if (parsed != null &&
TEMPORAL_LOGICAL_TYPE_NAMES.contains(parsed.schema().name())) {
+ return parsed;
+ }
+ return new SchemaAndValue(Schema.STRING_SCHEMA, content);
}
}
@@ -838,7 +867,9 @@ public class Values {
}
if (!parser.canConsume(ENTRY_DELIMITER)) {
- throw new DataException("Map entry is missing '=': " +
parser.original());
+ throw new DataException("Map entry is missing '" +
ENTRY_DELIMITER
+ + "' at " + parser.position()
+ + " in " + parser.original());
}
SchemaAndValue value = parse(parser, true);
Object entryValue = value != null ? value.value() : null;
@@ -855,7 +886,7 @@ public class Values {
throw new DataException("Map is missing terminating '}': " +
parser.original());
}
} catch (DataException e) {
- LOG.debug("Unable to parse the value as a map or an array;
reverting to string", e);
+ LOG.trace("Unable to parse the value as a map or an array;
reverting to string", e);
parser.rewindTo(startPosition);
}
@@ -867,6 +898,27 @@ public class Values {
char firstChar = token.charAt(0);
boolean firstCharIsDigit = Character.isDigit(firstChar);
+
+ // Temporal types are more restrictive, so try them first
+ if (firstCharIsDigit) {
+ // The time and timestamp literals may be split into 5 tokens
since an unescaped colon
+ // is a delimiter. Check these first since the first of these
tokens is a simple numeric
+ int position = parser.mark();
+ String remainder = parser.next(4);
+ if (remainder != null) {
+ String timeOrTimestampStr = token + remainder;
+ SchemaAndValue temporal = parseAsTemporal(timeOrTimestampStr);
+ if (temporal != null) {
+ return temporal;
+ }
+ }
+ // No match was found using the 5 tokens, so rewind and see if the
current token has a date, time, or timestamp
+ parser.rewindTo(position);
+ SchemaAndValue temporal = parseAsTemporal(token);
+ if (temporal != null) {
+ return temporal;
+ }
+ }
if (firstCharIsDigit || firstChar == '+' || firstChar == '-') {
try {
// Try to parse as a number ...
@@ -901,37 +953,42 @@ public class Values {
// can't parse as a number
}
}
- if (firstCharIsDigit) {
- // Check for a date, time, or timestamp ...
- int tokenLength = token.length();
- if (tokenLength == ISO_8601_DATE_LENGTH) {
- try {
- return new SchemaAndValue(Date.SCHEMA, new
SimpleDateFormat(ISO_8601_DATE_FORMAT_PATTERN).parse(token));
- } catch (ParseException e) {
- // not a valid date
- }
- } else if (tokenLength == ISO_8601_TIME_LENGTH) {
- try {
- return new SchemaAndValue(Time.SCHEMA, new
SimpleDateFormat(ISO_8601_TIME_FORMAT_PATTERN).parse(token));
- } catch (ParseException e) {
- // not a valid date
- }
- } else if (tokenLength == ISO_8601_TIMESTAMP_LENGTH) {
- try {
- return new SchemaAndValue(Timestamp.SCHEMA, new
SimpleDateFormat(ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(token));
- } catch (ParseException e) {
- // not a valid date
- }
- }
- }
if (embedded) {
throw new DataException("Failed to parse embedded value");
}
- // At this point, the only thing this can be is a string. Embedded
strings were processed above,
- // so this is not embedded and we can use the original string...
+ // At this point, the only thing this non-embedded value can be is a
string.
return new SchemaAndValue(Schema.STRING_SCHEMA, parser.original());
}
+ private static SchemaAndValue parseAsTemporal(String token) {
+ if (token == null) {
+ return null;
+ }
+ // If the colons were escaped, we'll see the escape chars and need to
remove them
+ token = token.replace("\\:", ":");
+ int tokenLength = token.length();
+ if (tokenLength == ISO_8601_TIME_LENGTH) {
+ try {
+ return new SchemaAndValue(Time.SCHEMA, new
SimpleDateFormat(ISO_8601_TIME_FORMAT_PATTERN).parse(token));
+ } catch (ParseException e) {
+ // not a valid date
+ }
+ } else if (tokenLength == ISO_8601_TIMESTAMP_LENGTH) {
+ try {
+ return new SchemaAndValue(Timestamp.SCHEMA, new
SimpleDateFormat(ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(token));
+ } catch (ParseException e) {
+ // not a valid date
+ }
+ } else if (tokenLength == ISO_8601_DATE_LENGTH) {
+ try {
+ return new SchemaAndValue(Date.SCHEMA, new
SimpleDateFormat(ISO_8601_DATE_FORMAT_PATTERN).parse(token));
+ } catch (ParseException e) {
+ // not a valid date
+ }
+ }
+ return null;
+ }
+
protected static Schema commonSchemaFor(Schema previous, SchemaAndValue
latest) {
if (latest == null) {
return previous;
@@ -1088,6 +1145,7 @@ public class Values {
public void rewindTo(int position) {
iter.setIndex(position);
nextToken = null;
+ previousToken = null;
}
public String original() {
@@ -1112,6 +1170,19 @@ public class Values {
return previousToken;
}
+ public String next(int n) {
+ int current = mark();
+ int start = mark();
+ for (int i = 0; i != n; ++i) {
+ if (!hasNext()) {
+ rewindTo(start);
+ return null;
+ }
+ next();
+ }
+ return original.substring(current, position());
+ }
+
private String consumeNextToken() throws NoSuchElementException {
boolean escaped = false;
int start = iter.getIndex();
diff --git
a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
index a5909f3..c437e46 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.connect.data.Values.Parser;
import org.apache.kafka.connect.errors.DataException;
import org.junit.Test;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -383,6 +384,146 @@ public class ValuesTest {
assertEquals(str, result.value());
}
+ @Test
+ public void shouldParseTimestampStringAsTimestamp() throws Exception {
+ String str = "2019-08-23T14:34:54.346Z";
+ SchemaAndValue result = Values.parseString(str);
+ assertEquals(Type.INT64, result.schema().type());
+ assertEquals(Timestamp.LOGICAL_NAME, result.schema().name());
+ java.util.Date expected = new
SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(str);
+ assertEquals(expected, result.value());
+ }
+
+ @Test
+ public void shouldParseDateStringAsDate() throws Exception {
+ String str = "2019-08-23";
+ SchemaAndValue result = Values.parseString(str);
+ assertEquals(Type.INT32, result.schema().type());
+ assertEquals(Date.LOGICAL_NAME, result.schema().name());
+ java.util.Date expected = new
SimpleDateFormat(Values.ISO_8601_DATE_FORMAT_PATTERN).parse(str);
+ assertEquals(expected, result.value());
+ }
+
+ @Test
+ public void shouldParseTimeStringAsDate() throws Exception {
+ String str = "14:34:54.346Z";
+ SchemaAndValue result = Values.parseString(str);
+ assertEquals(Type.INT32, result.schema().type());
+ assertEquals(Time.LOGICAL_NAME, result.schema().name());
+ java.util.Date expected = new
SimpleDateFormat(Values.ISO_8601_TIME_FORMAT_PATTERN).parse(str);
+ assertEquals(expected, result.value());
+ }
+
+ @Test
+ public void shouldParseTimestampStringWithEscapedColonsAsTimestamp()
throws Exception {
+ String str = "2019-08-23T14\\:34\\:54.346Z";
+ SchemaAndValue result = Values.parseString(str);
+ assertEquals(Type.INT64, result.schema().type());
+ assertEquals(Timestamp.LOGICAL_NAME, result.schema().name());
+ String expectedStr = "2019-08-23T14:34:54.346Z";
+ java.util.Date expected = new
SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(expectedStr);
+ assertEquals(expected, result.value());
+ }
+
+ @Test
+ public void shouldParseTimeStringWithEscapedColonsAsDate() throws
Exception {
+ String str = "14\\:34\\:54.346Z";
+ SchemaAndValue result = Values.parseString(str);
+ assertEquals(Type.INT32, result.schema().type());
+ assertEquals(Time.LOGICAL_NAME, result.schema().name());
+ String expectedStr = "14:34:54.346Z";
+ java.util.Date expected = new
SimpleDateFormat(Values.ISO_8601_TIME_FORMAT_PATTERN).parse(expectedStr);
+ assertEquals(expected, result.value());
+ }
+
+ @Test
+ public void shouldParseDateStringAsDateInArray() throws Exception {
+ String dateStr = "2019-08-23";
+ String arrayStr = "[" + dateStr + "]";
+ SchemaAndValue result = Values.parseString(arrayStr);
+ assertEquals(Type.ARRAY, result.schema().type());
+ Schema elementSchema = result.schema().valueSchema();
+ assertEquals(Type.INT32, elementSchema.type());
+ assertEquals(Date.LOGICAL_NAME, elementSchema.name());
+ java.util.Date expected = new
SimpleDateFormat(Values.ISO_8601_DATE_FORMAT_PATTERN).parse(dateStr);
+ assertEquals(Collections.singletonList(expected), result.value());
+ }
+
+ @Test
+ public void shouldParseTimeStringAsTimeInArray() throws Exception {
+ String timeStr = "14:34:54.346Z";
+ String arrayStr = "[" + timeStr + "]";
+ SchemaAndValue result = Values.parseString(arrayStr);
+ assertEquals(Type.ARRAY, result.schema().type());
+ Schema elementSchema = result.schema().valueSchema();
+ assertEquals(Type.INT32, elementSchema.type());
+ assertEquals(Time.LOGICAL_NAME, elementSchema.name());
+ java.util.Date expected = new
SimpleDateFormat(Values.ISO_8601_TIME_FORMAT_PATTERN).parse(timeStr);
+ assertEquals(Collections.singletonList(expected), result.value());
+ }
+
+ @Test
+ public void shouldParseTimestampStringAsTimestampInArray() throws
Exception {
+ String tsStr = "2019-08-23T14:34:54.346Z";
+ String arrayStr = "[" + tsStr + "]";
+ SchemaAndValue result = Values.parseString(arrayStr);
+ assertEquals(Type.ARRAY, result.schema().type());
+ Schema elementSchema = result.schema().valueSchema();
+ assertEquals(Type.INT64, elementSchema.type());
+ assertEquals(Timestamp.LOGICAL_NAME, elementSchema.name());
+ java.util.Date expected = new
SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(tsStr);
+ assertEquals(Collections.singletonList(expected), result.value());
+ }
+
+ @Test
+ public void shouldParseMultipleTimestampStringAsTimestampInArray() throws
Exception {
+ String tsStr1 = "2019-08-23T14:34:54.346Z";
+ String tsStr2 = "2019-01-23T15:12:34.567Z";
+ String tsStr3 = "2019-04-23T19:12:34.567Z";
+ String arrayStr = "[" + tsStr1 + "," + tsStr2 + ", " + tsStr3 + "]";
+ SchemaAndValue result = Values.parseString(arrayStr);
+ assertEquals(Type.ARRAY, result.schema().type());
+ Schema elementSchema = result.schema().valueSchema();
+ assertEquals(Type.INT64, elementSchema.type());
+ assertEquals(Timestamp.LOGICAL_NAME, elementSchema.name());
+ java.util.Date expected1 = new
SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(tsStr1);
+ java.util.Date expected2 = new
SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(tsStr2);
+ java.util.Date expected3 = new
SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(tsStr3);
+ assertEquals(Arrays.asList(expected1, expected2, expected3),
result.value());
+ }
+
+ @Test
+ public void shouldParseQuotedTimeStringAsTimeInMap() throws Exception {
+ String keyStr = "k1";
+ String timeStr = "14:34:54.346Z";
+ String mapStr = "{\"" + keyStr + "\":\"" + timeStr + "\"}";
+ SchemaAndValue result = Values.parseString(mapStr);
+ assertEquals(Type.MAP, result.schema().type());
+ Schema keySchema = result.schema().keySchema();
+ Schema valueSchema = result.schema().valueSchema();
+ assertEquals(Type.STRING, keySchema.type());
+ assertEquals(Type.INT32, valueSchema.type());
+ assertEquals(Time.LOGICAL_NAME, valueSchema.name());
+ java.util.Date expected = new
SimpleDateFormat(Values.ISO_8601_TIME_FORMAT_PATTERN).parse(timeStr);
+ assertEquals(Collections.singletonMap(keyStr, expected),
result.value());
+ }
+
+ @Test
+ public void shouldParseTimeStringAsTimeInMap() throws Exception {
+ String keyStr = "k1";
+ String timeStr = "14:34:54.346Z";
+ String mapStr = "{\"" + keyStr + "\":" + timeStr + "}";
+ SchemaAndValue result = Values.parseString(mapStr);
+ assertEquals(Type.MAP, result.schema().type());
+ Schema keySchema = result.schema().keySchema();
+ Schema valueSchema = result.schema().valueSchema();
+ assertEquals(Type.STRING, keySchema.type());
+ assertEquals(Type.INT32, valueSchema.type());
+ assertEquals(Time.LOGICAL_NAME, valueSchema.name());
+ java.util.Date expected = new
SimpleDateFormat(Values.ISO_8601_TIME_FORMAT_PATTERN).parse(timeStr);
+ assertEquals(Collections.singletonMap(keyStr, expected),
result.value());
+ }
+
/**
* This is technically invalid JSON, and we don't want to simply ignore
the blank elements.
*/
@@ -433,6 +574,20 @@ public class ValuesTest {
}
@Test
+ public void shouldConsumeMultipleTokens() {
+ String value = "a:b:c:d:e:f:g:h";
+ Parser parser = new Parser(value);
+ String firstFive = parser.next(5);
+ assertEquals("a:b:c", firstFive);
+ assertEquals(":", parser.next());
+ assertEquals("d", parser.next());
+ assertEquals(":", parser.next());
+ String lastEight = parser.next(8); // only 7 remain
+ assertNull(lastEight);
+ assertEquals("e", parser.next());
+ }
+
+ @Test
public void shouldParseStringsWithoutDelimiters() {
//assertParsed("");
assertParsed(" ");