This is an automated email from the ASF dual-hosted git repository.
sjwiesman pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 61f1f0f [FLINK-23073][formats / CSV] Fix space handling in Row CSV
timestamp parser
61f1f0f is described below
commit 61f1f0f4f09875b5d8f4c2db956aa520facc4b2c
Author: Seth Wiesman <[email protected]>
AuthorDate: Mon Jun 21 13:37:41 2021 -0500
[FLINK-23073][formats / CSV] Fix space handling in Row CSV timestamp parser
This closes #16246
---
.../org/apache/flink/formats/csv/CsvRowDeserializationSchema.java | 2 +-
.../java/org/apache/flink/formats/csv/CsvToRowDataConverters.java | 2 +-
.../apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java | 6 ++++++
3 files changed, 8 insertions(+), 2 deletions(-)
diff --git
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
index 535767a..783e1f1 100644
---
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
+++
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
@@ -321,7 +321,7 @@ public final class CsvRowDeserializationSchema implements
DeserializationSchema<
} else if (info.equals(Types.LOCAL_TIME)) {
return (node) -> Time.valueOf(node.asText()).toLocalTime();
} else if (info.equals(Types.LOCAL_DATE_TIME)) {
- return (node) -> LocalDateTime.parse(node.asText(),
SQL_TIMESTAMP_FORMAT);
+ return (node) -> LocalDateTime.parse(node.asText().trim(),
SQL_TIMESTAMP_FORMAT);
} else if (info.equals(Types.INSTANT)) {
return (node) ->
LocalDateTime.parse(node.asText(),
SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT)
diff --git
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java
index ce19755..64ddc68 100644
---
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java
+++
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java
@@ -254,7 +254,7 @@ public class CsvToRowDataConverters implements Serializable
{
private TimestampData convertToTimestamp(
JsonNode jsonNode, DateTimeFormatter dateTimeFormatter) {
return TimestampData.fromLocalDateTime(
- LocalDateTime.parse(jsonNode.asText(), dateTimeFormatter));
+ LocalDateTime.parse(jsonNode.asText().trim(),
dateTimeFormatter));
}
private StringData convertToString(JsonNode jsonNode) {
diff --git
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
index 3e064ea..80a32d0 100644
---
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
+++
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
@@ -123,6 +123,12 @@ public class CsvRowDeSerializationSchemaTest {
testField(Types.INT, " 12 ", 12, deserConfig, ";");
testField(Types.INT, "12", 12, serConfig, deserConfig, ";");
testField(
+ Types.LOCAL_DATE_TIME,
+ " 2018-10-12 12:12:12 ",
+ LocalDateTime.parse("2018-10-12T12:12:12"),
+ deserConfig,
+ ";");
+ testField(
Types.ROW(Types.STRING, Types.STRING),
"1:hello",
Row.of("1", "hello"),