This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 258a01e [FLINK-18299][json] Fix the non SQL standard timestamp format
in JSON format
258a01e is described below
commit 258a01e7189372ddbb621f8b57d18257a8bab0e5
Author: Shengkai <[email protected]>
AuthorDate: Wed Jun 17 21:32:10 2020 +0800
[FLINK-18299][json] Fix the non SQL standard timestamp format in JSON format
The current timestamp format in JSON format is not SQL standard which uses
RFC-3339. This commit changes the default behavior to parse/generate timestamp
using SQL standard. Besides, it introduces an option
"json.timestamp-format.standard" to have the ability to fallback to ISO
standard.
This closes #12661
---
docs/dev/table/connectors/formats/canal.md | 14 +++-
docs/dev/table/connectors/formats/canal.zh.md | 14 +++-
docs/dev/table/connectors/formats/debezium.md | 14 +++-
docs/dev/table/connectors/formats/debezium.zh.md | 14 +++-
docs/dev/table/connectors/formats/json.md | 12 +++
docs/dev/table/connectors/formats/json.zh.md | 12 +++
.../table/Elasticsearch6DynamicSinkITCase.java | 12 +--
.../table/Elasticsearch7DynamicSinkITCase.java | 12 +--
.../tests/util/kafka/SQLClientKafkaITCase.java | 16 ++--
.../formats/json/JsonFileSystemFormatFactory.java | 9 ++-
.../flink/formats/json/JsonFormatFactory.java | 17 ++++-
.../org/apache/flink/formats/json/JsonOptions.java | 43 +++++++++++
.../json/JsonRowDataDeserializationSchema.java | 54 ++++++-------
.../json/JsonRowDataSerializationSchema.java | 40 +++++++---
.../org/apache/flink/formats/json/TimeFormats.java | 18 ++++-
.../{JsonOptions.java => TimestampFormat.java} | 26 +++----
.../json/canal/CanalJsonDeserializationSchema.java | 7 +-
.../formats/json/canal/CanalJsonFormatFactory.java | 17 +++--
.../DebeziumJsonDeserializationSchema.java | 7 +-
.../json/debezium/DebeziumJsonFormatFactory.java | 16 ++--
.../flink/formats/json/JsonFormatFactoryTest.java | 26 ++++++-
.../formats/json/JsonRowDataSerDeSchemaTest.java | 89 ++++++++++++++++++----
.../canal/CanalJsonDeserializationSchemaTest.java | 4 +-
.../json/canal/CanalJsonFormatFactoryTest.java | 5 +-
.../DebeziumJsonDeserializationSchemaTest.java | 4 +-
.../debezium/DebeziumJsonFormatFactoryTest.java | 5 +-
26 files changed, 379 insertions(+), 128 deletions(-)
diff --git a/docs/dev/table/connectors/formats/canal.md
b/docs/dev/table/connectors/formats/canal.md
index 43b69da..6f9ea44 100644
--- a/docs/dev/table/connectors/formats/canal.md
+++ b/docs/dev/table/connectors/formats/canal.md
@@ -162,13 +162,25 @@ Format Options
<td>Specify what format to use, here should be
<code>'canal-json'</code>.</td>
</tr>
<tr>
- <td><h5>json.ignore-parse-errors</h5></td>
+ <td><h5>canal-json.ignore-parse-errors</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Skip fields and rows with parse errors instead of failing.
Fields are set to null in case of errors.</td>
</tr>
+ <tr>
+ <td><h5>canal-json.timestamp-format.standard</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;"><code>'SQL'</code></td>
+ <td>String</td>
+ <td>Specify the input and output timestamp format. Currently supported
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+ <ul>
+ <li>Option <code>'SQL'</code> will parse input timestamp in
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and
output timestamp in the same format.</li>
+ <li>Option <code>'ISO-8601'</code>will parse input timestamp in
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and
output timestamp in the same format.</li>
+ </ul>
+ </td>
+ </tr>
</tbody>
</table>
diff --git a/docs/dev/table/connectors/formats/canal.zh.md
b/docs/dev/table/connectors/formats/canal.zh.md
index ff5df92..2c36128 100644
--- a/docs/dev/table/connectors/formats/canal.zh.md
+++ b/docs/dev/table/connectors/formats/canal.zh.md
@@ -162,13 +162,25 @@ Format Options
<td>Specify what format to use, here should be
<code>'canal-json'</code>.</td>
</tr>
<tr>
- <td><h5>json.ignore-parse-errors</h5></td>
+ <td><h5>canal-json.ignore-parse-errors</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Skip fields and rows with parse errors instead of failing.
Fields are set to null in case of errors.</td>
</tr>
+ <tr>
+ <td><h5>canal-json.timestamp-format.standard</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;"><code>'SQL'</code></td>
+ <td>String</td>
+ <td>Specify the input and output timestamp format. Currently supported
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+ <ul>
+ <li>Option <code>'SQL'</code> will parse input timestamp in
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and
output timestamp in the same format.</li>
+ <li>Option <code>'ISO-8601'</code>will parse input timestamp in
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and
output timestamp in the same format.</li>
+ </ul>
+ </td>
+ </tr>
</tbody>
</table>
diff --git a/docs/dev/table/connectors/formats/debezium.md
b/docs/dev/table/connectors/formats/debezium.md
index 159f6d6..98acf5f 100644
--- a/docs/dev/table/connectors/formats/debezium.md
+++ b/docs/dev/table/connectors/formats/debezium.md
@@ -178,13 +178,25 @@ Format Options
This option indicates whether the Debezium JSON message includes the
schema or not. </td>
</tr>
<tr>
- <td><h5>json.ignore-parse-errors</h5></td>
+ <td><h5>debezium-json.ignore-parse-errors</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Skip fields and rows with parse errors instead of failing.
Fields are set to null in case of errors.</td>
</tr>
+ <tr>
+ <td><h5>debezium-json.timestamp-format.standard</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;"><code>'SQL'</code></td>
+ <td>String</td>
+ <td>Specify the input and output timestamp format. Currently supported
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+ <ul>
+ <li>Option <code>'SQL'</code> will parse input timestamp in
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and
output timestamp in the same format.</li>
+ <li>Option <code>'ISO-8601'</code>will parse input timestamp in
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and
output timestamp in the same format.</li>
+ </ul>
+ </td>
+ </tr>
</tbody>
</table>
diff --git a/docs/dev/table/connectors/formats/debezium.zh.md
b/docs/dev/table/connectors/formats/debezium.zh.md
index b957bbc..794b4c35 100644
--- a/docs/dev/table/connectors/formats/debezium.zh.md
+++ b/docs/dev/table/connectors/formats/debezium.zh.md
@@ -178,13 +178,25 @@ Format Options
This option indicates whether the Debezium JSON message includes the
schema or not. </td>
</tr>
<tr>
- <td><h5>json.ignore-parse-errors</h5></td>
+ <td><h5>debezium-json.ignore-parse-errors</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Skip fields and rows with parse errors instead of failing.
Fields are set to null in case of errors.</td>
</tr>
+ <tr>
+ <td><h5>debezium-json.timestamp-format.standard</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;"><code>'SQL'</code></td>
+ <td>String</td>
+ <td>Specify the input and output timestamp format. Currently supported
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+ <ul>
+ <li>Option <code>'SQL'</code> will parse input timestamp in
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and
output timestamp in the same format.</li>
+ <li>Option <code>'ISO-8601'</code>will parse input timestamp in
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and
output timestamp in the same format.</li>
+ </ul>
+ </td>
+ </tr>
</tbody>
</table>
diff --git a/docs/dev/table/connectors/formats/json.md
b/docs/dev/table/connectors/formats/json.md
index e4a0df9..181f48e 100644
--- a/docs/dev/table/connectors/formats/json.md
+++ b/docs/dev/table/connectors/formats/json.md
@@ -103,6 +103,18 @@ Format Options
<td>Skip fields and rows with parse errors instead of failing.
Fields are set to null in case of errors.</td>
</tr>
+ <tr>
+ <td><h5>json.timestamp-format.standard</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;"><code>'SQL'</code></td>
+ <td>String</td>
+ <td>Specify the input and output timestamp format. Currently supported
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+ <ul>
+ <li>Option <code>'SQL'</code> will parse input timestamp in
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and
output timestamp in the same format.</li>
+ <li>Option <code>'ISO-8601'</code>will parse input timestamp in
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and
output timestamp in the same format.</li>
+ </ul>
+ </td>
+ </tr>
</tbody>
</table>
diff --git a/docs/dev/table/connectors/formats/json.zh.md
b/docs/dev/table/connectors/formats/json.zh.md
index e4a0df9..181f48e 100644
--- a/docs/dev/table/connectors/formats/json.zh.md
+++ b/docs/dev/table/connectors/formats/json.zh.md
@@ -103,6 +103,18 @@ Format Options
<td>Skip fields and rows with parse errors instead of failing.
Fields are set to null in case of errors.</td>
</tr>
+ <tr>
+ <td><h5>json.timestamp-format.standard</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;"><code>'SQL'</code></td>
+ <td>String</td>
+ <td>Specify the input and output timestamp format. Currently supported
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+ <ul>
+ <li>Option <code>'SQL'</code> will parse input timestamp in
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and
output timestamp in the same format.</li>
+ <li>Option <code>'ISO-8601'</code>will parse input timestamp in
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and
output timestamp in the same format.</li>
+ </ul>
+ </td>
+ </tr>
</tbody>
</table>
diff --git
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
index 41a67f6..26cf90a 100644
---
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
+++
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
@@ -107,12 +107,12 @@ public class Elasticsearch6DynamicSinkITCase {
Map<String, Object> response = client.get(new GetRequest(index,
myType, "1_2012-12-12T12:12:12")).actionGet().getSource();
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put("a", 1);
- expectedMap.put("b", "00:00:12Z");
+ expectedMap.put("b", "00:00:12");
expectedMap.put("c", "ABCDE");
expectedMap.put("d", 12.12d);
expectedMap.put("e", 2);
expectedMap.put("f", "2003-10-20");
- expectedMap.put("g", "2012-12-12T12:12:12Z");
+ expectedMap.put("g", "2012-12-12 12:12:12");
assertThat(response, equalTo(expectedMap));
}
@@ -165,12 +165,12 @@ public class Elasticsearch6DynamicSinkITCase {
.getSource();
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put("a", 1);
- expectedMap.put("b", "00:00:12Z");
+ expectedMap.put("b", "00:00:12");
expectedMap.put("c", "ABCDE");
expectedMap.put("d", 12.12d);
expectedMap.put("e", 2);
expectedMap.put("f", "2003-10-20");
- expectedMap.put("g", "2012-12-12T12:12:12Z");
+ expectedMap.put("g", "2012-12-12 12:12:12");
assertThat(response, equalTo(expectedMap));
}
@@ -238,12 +238,12 @@ public class Elasticsearch6DynamicSinkITCase {
Map<String, Object> result = hits.getAt(0).getSourceAsMap();
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put("a", 1);
- expectedMap.put("b", "00:00:12Z");
+ expectedMap.put("b", "00:00:12");
expectedMap.put("c", "ABCDE");
expectedMap.put("d", 12.12d);
expectedMap.put("e", 2);
expectedMap.put("f", "2003-10-20");
- expectedMap.put("g", "2012-12-12T12:12:12Z");
+ expectedMap.put("g", "2012-12-12 12:12:12");
assertThat(result, equalTo(expectedMap));
}
diff --git
a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
index dbd1ff9..7f41eb7 100644
---
a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
+++
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
@@ -105,12 +105,12 @@ public class Elasticsearch7DynamicSinkITCase {
Map<String, Object> response = client.get(new GetRequest(index,
"1_2012-12-12T12:12:12")).actionGet().getSource();
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put("a", 1);
- expectedMap.put("b", "00:00:12Z");
+ expectedMap.put("b", "00:00:12");
expectedMap.put("c", "ABCDE");
expectedMap.put("d", 12.12d);
expectedMap.put("e", 2);
expectedMap.put("f", "2003-10-20");
- expectedMap.put("g", "2012-12-12T12:12:12Z");
+ expectedMap.put("g", "2012-12-12 12:12:12");
assertThat(response, equalTo(expectedMap));
}
@@ -159,12 +159,12 @@ public class Elasticsearch7DynamicSinkITCase {
Map<String, Object> response = client.get(new GetRequest(index,
"1_2012-12-12T12:12:12")).actionGet().getSource();
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put("a", 1);
- expectedMap.put("b", "00:00:12Z");
+ expectedMap.put("b", "00:00:12");
expectedMap.put("c", "ABCDE");
expectedMap.put("d", 12.12d);
expectedMap.put("e", 2);
expectedMap.put("f", "2003-10-20");
- expectedMap.put("g", "2012-12-12T12:12:12Z");
+ expectedMap.put("g", "2012-12-12 12:12:12");
assertThat(response, equalTo(expectedMap));
}
@@ -230,12 +230,12 @@ public class Elasticsearch7DynamicSinkITCase {
Map<String, Object> result = hits.getAt(0).getSourceAsMap();
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put("a", 1);
- expectedMap.put("b", "00:00:12Z");
+ expectedMap.put("b", "00:00:12");
expectedMap.put("c", "ABCDE");
expectedMap.put("d", 12.12d);
expectedMap.put("e", 2);
expectedMap.put("f", "2003-10-20");
- expectedMap.put("g", "2012-12-12T12:12:12Z");
+ expectedMap.put("g", "2012-12-12 12:12:12");
assertThat(result, equalTo(expectedMap));
}
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
index 9d8d1a2..8671576 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
@@ -141,14 +141,14 @@ public class SQLClientKafkaITCase extends TestLogger {
String testAvroTopic = "test-avro-" + kafkaVersion +
"-" + UUID.randomUUID().toString();
kafka.createTopic(1, 1, testJsonTopic);
String[] messages = new String[]{
- "{\"rowtime\":
\"2018-03-12T08:00:00Z\", \"user\": \"Alice\", \"event\": { \"type\":
\"WARNING\", \"message\": \"This is a warning.\"}}",
- "{\"rowtime\":
\"2018-03-12T08:10:00Z\", \"user\": \"Alice\", \"event\": { \"type\":
\"WARNING\", \"message\": \"This is a warning.\"}}",
- "{\"rowtime\":
\"2018-03-12T09:00:00Z\", \"user\": \"Bob\", \"event\": { \"type\":
\"WARNING\", \"message\": \"This is another warning.\"}}",
- "{\"rowtime\":
\"2018-03-12T09:10:00Z\", \"user\": \"Alice\", \"event\": { \"type\": \"INFO\",
\"message\": \"This is a info.\"}}",
- "{\"rowtime\":
\"2018-03-12T09:20:00Z\", \"user\": \"Steve\", \"event\": { \"type\": \"INFO\",
\"message\": \"This is another info.\"}}",
- "{\"rowtime\":
\"2018-03-12T09:30:00Z\", \"user\": \"Steve\", \"event\": { \"type\": \"INFO\",
\"message\": \"This is another info.\"}}",
- "{\"rowtime\":
\"2018-03-12T09:30:00Z\", \"user\": null, \"event\": { \"type\": \"WARNING\",
\"message\": \"This is a bad message because the user is missing.\"}}",
- "{\"rowtime\":
\"2018-03-12T10:40:00Z\", \"user\": \"Bob\", \"event\": { \"type\": \"ERROR\",
\"message\": \"This is an error.\"}}"
+ "{\"rowtime\": \"2018-03-12 08:00:00\",
\"user\": \"Alice\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is
a warning.\"}}",
+ "{\"rowtime\": \"2018-03-12 08:10:00\",
\"user\": \"Alice\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is
a warning.\"}}",
+ "{\"rowtime\": \"2018-03-12 09:00:00\",
\"user\": \"Bob\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is
another warning.\"}}",
+ "{\"rowtime\": \"2018-03-12 09:10:00\",
\"user\": \"Alice\", \"event\": { \"type\": \"INFO\", \"message\": \"This is a
info.\"}}",
+ "{\"rowtime\": \"2018-03-12 09:20:00\",
\"user\": \"Steve\", \"event\": { \"type\": \"INFO\", \"message\": \"This is
another info.\"}}",
+ "{\"rowtime\": \"2018-03-12 09:30:00\",
\"user\": \"Steve\", \"event\": { \"type\": \"INFO\", \"message\": \"This is
another info.\"}}",
+ "{\"rowtime\": \"2018-03-12 09:30:00\",
\"user\": null, \"event\": { \"type\": \"WARNING\", \"message\": \"This is a
bad message because the user is missing.\"}}",
+ "{\"rowtime\": \"2018-03-12 10:40:00\",
\"user\": \"Bob\", \"event\": { \"type\": \"ERROR\", \"message\": \"This is an
error.\"}}"
};
kafka.sendMessages(testJsonTopic, messages);
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java
index d464634..1ecdc73 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java
@@ -47,6 +47,7 @@ import java.util.stream.Collectors;
import static
org.apache.flink.formats.json.JsonFormatFactory.validateFormatOptions;
import static org.apache.flink.formats.json.JsonOptions.FAIL_ON_MISSING_FIELD;
import static org.apache.flink.formats.json.JsonOptions.IGNORE_PARSE_ERRORS;
+import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT;
/**
* Factory to build reader/writer to read/write json format file.
@@ -70,6 +71,7 @@ public class JsonFileSystemFormatFactory implements
FileSystemFormatFactory {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(FAIL_ON_MISSING_FIELD);
options.add(IGNORE_PARSE_ERRORS);
+ options.add(TIMESTAMP_FORMAT);
return options;
}
@@ -79,13 +81,15 @@ public class JsonFileSystemFormatFactory implements
FileSystemFormatFactory {
validateFormatOptions(options);
boolean failOnMissingField = options.get(FAIL_ON_MISSING_FIELD);
boolean ignoreParseErrors = options.get(IGNORE_PARSE_ERRORS);
+ TimestampFormat timestampOption =
JsonOptions.getTimestampFormat(options);
RowType formatRowType = context.getFormatRowType();
JsonRowDataDeserializationSchema deserializationSchema = new
JsonRowDataDeserializationSchema(
formatRowType,
new GenericTypeInfo(GenericRowData.class),
failOnMissingField,
- ignoreParseErrors);
+ ignoreParseErrors,
+ timestampOption);
String[] fieldNames = context.getSchema().getFieldNames();
List<String> projectFields =
Arrays.stream(context.getProjectFields())
@@ -117,7 +121,8 @@ public class JsonFileSystemFormatFactory implements
FileSystemFormatFactory {
@Override
public Optional<Encoder<RowData>> createEncoder(WriterContext context) {
- return Optional.of(new JsonRowDataEncoder(new
JsonRowDataSerializationSchema(context.getFormatRowType())));
+ return Optional.of(new JsonRowDataEncoder(new
JsonRowDataSerializationSchema(context.getFormatRowType(),
+
JsonOptions.getTimestampFormat(context.getFormatOptions()))));
}
@Override
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
index ba98867..57952b9 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
@@ -43,6 +43,8 @@ import java.util.Set;
import static org.apache.flink.formats.json.JsonOptions.FAIL_ON_MISSING_FIELD;
import static org.apache.flink.formats.json.JsonOptions.IGNORE_PARSE_ERRORS;
+import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT_ENUM;
/**
* Table format factory for providing configured instances of JSON to RowData
@@ -64,6 +66,7 @@ public class JsonFormatFactory implements
final boolean failOnMissingField =
formatOptions.get(FAIL_ON_MISSING_FIELD);
final boolean ignoreParseErrors =
formatOptions.get(IGNORE_PARSE_ERRORS);
+ TimestampFormat timestampOption =
JsonOptions.getTimestampFormat(formatOptions);
return new DecodingFormat<DeserializationSchema<RowData>>() {
@Override
@@ -77,7 +80,9 @@ public class JsonFormatFactory implements
rowType,
rowDataTypeInfo,
failOnMissingField,
- ignoreParseErrors);
+ ignoreParseErrors,
+ timestampOption
+ );
}
@Override
@@ -93,13 +98,15 @@ public class JsonFormatFactory implements
ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
+ TimestampFormat timestampOption =
JsonOptions.getTimestampFormat(formatOptions);
+
return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
public SerializationSchema<RowData>
createRuntimeEncoder(
DynamicTableSink.Context context,
DataType consumedDataType) {
final RowType rowType = (RowType)
consumedDataType.getLogicalType();
- return new
JsonRowDataSerializationSchema(rowType);
+ return new
JsonRowDataSerializationSchema(rowType, timestampOption);
}
@Override
@@ -124,6 +131,7 @@ public class JsonFormatFactory implements
Set<ConfigOption<?>> options = new HashSet<>();
options.add(FAIL_ON_MISSING_FIELD);
options.add(IGNORE_PARSE_ERRORS);
+ options.add(TIMESTAMP_FORMAT);
return options;
}
@@ -134,11 +142,16 @@ public class JsonFormatFactory implements
static void validateFormatOptions(ReadableConfig tableOptions) {
boolean failOnMissingField =
tableOptions.get(FAIL_ON_MISSING_FIELD);
boolean ignoreParseErrors =
tableOptions.get(IGNORE_PARSE_ERRORS);
+ String timestampFormat = tableOptions.get(TIMESTAMP_FORMAT);
if (ignoreParseErrors && failOnMissingField) {
throw new
ValidationException(FAIL_ON_MISSING_FIELD.key()
+ " and "
+ IGNORE_PARSE_ERRORS.key()
+ " shouldn't both be true.");
}
+ if (!TIMESTAMP_FORMAT_ENUM.contains(timestampFormat)){
+ throw new
ValidationException(String.format("Unsupported value '%s' for %s. Supported
values are [SQL, ISO-8601].",
+ timestampFormat, TIMESTAMP_FORMAT.key()));
+ }
}
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
index dca8c16..6fc726b 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
@@ -20,6 +20,12 @@ package org.apache.flink.formats.json;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
/**
* This class holds configuration constants used by json format.
@@ -38,4 +44,41 @@ public class JsonOptions {
.defaultValue(false)
.withDescription("Optional flag to skip fields and rows
with parse errors instead of failing;\n"
+ "fields are set to null in case of
errors, false by default");
+
+ public static final ConfigOption<String> TIMESTAMP_FORMAT =
ConfigOptions
+ .key("timestamp-format.standard")
+ .stringType()
+ .defaultValue("SQL")
+ .withDescription("Optional flag to specify timestamp
format, SQL by default." +
+ " Option ISO-8601 will parse input timestamp in
\"yyyy-MM-ddTHH:mm:ss.s{precision}\" format and output timestamp in the same
format." +
+ " Option SQL will parse input timestamp in
\"yyyy-MM-dd HH:mm:ss.s{precision}\" format and output timestamp in the same
format.");
+
+ //
--------------------------------------------------------------------------------------------
+ // Option enumerations
+ //
--------------------------------------------------------------------------------------------
+
+ public static final String SQL = "SQL";
+ public static final String ISO_8601 = "ISO-8601";
+
+ public static final Set<String> TIMESTAMP_FORMAT_ENUM = new
HashSet<>(Arrays.asList(
+ SQL,
+ ISO_8601
+ ));
+
+ //
--------------------------------------------------------------------------------------------
+ // Utilities
+ //
--------------------------------------------------------------------------------------------
+
+ public static TimestampFormat getTimestampFormat(ReadableConfig config){
+ String timestampFormat = config.get(TIMESTAMP_FORMAT);
+ switch (timestampFormat){
+ case SQL:
+ return TimestampFormat.SQL;
+ case ISO_8601:
+ return TimestampFormat.ISO_8601;
+ default:
+ throw new TableException(
+ String.format("Unsupported timestamp
format '%s'. Validator should have checked that.", timestampFormat));
+ }
+ }
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index 87dee7f..d66ecce 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.json;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
@@ -50,7 +51,6 @@ import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
-import java.time.ZoneOffset;
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalQueries;
import java.util.HashMap;
@@ -60,8 +60,9 @@ import java.util.Objects;
import static java.lang.String.format;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
-import static
org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
-import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
+import static
org.apache.flink.formats.json.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.SQL_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.SQL_TIME_FORMAT;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -93,11 +94,15 @@ public class JsonRowDataDeserializationSchema implements
DeserializationSchema<R
/** Object mapper for parsing the JSON. */
private final ObjectMapper objectMapper = new ObjectMapper();
+ /** Timestamp format specification which is used to parse timestamp. */
+ private final TimestampFormat timestampFormat;
+
public JsonRowDataDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
boolean failOnMissingField,
- boolean ignoreParseErrors) {
+ boolean ignoreParseErrors,
+ TimestampFormat timestampFormat) {
if (ignoreParseErrors && failOnMissingField) {
throw new IllegalArgumentException(
"JSON format doesn't support failOnMissingField
and ignoreParseErrors are both enabled.");
@@ -106,6 +111,7 @@ public class JsonRowDataDeserializationSchema implements
DeserializationSchema<R
this.failOnMissingField = failOnMissingField;
this.ignoreParseErrors = ignoreParseErrors;
this.runtimeConverter =
createRowConverter(checkNotNull(rowType));
+ this.timestampFormat = timestampFormat;
}
@Override
@@ -142,12 +148,13 @@ public class JsonRowDataDeserializationSchema implements
DeserializationSchema<R
JsonRowDataDeserializationSchema that =
(JsonRowDataDeserializationSchema) o;
return failOnMissingField == that.failOnMissingField &&
ignoreParseErrors == that.ignoreParseErrors &&
- resultTypeInfo.equals(that.resultTypeInfo);
+ resultTypeInfo.equals(that.resultTypeInfo) &&
+ timestampFormat.equals(that.timestampFormat);
}
@Override
public int hashCode() {
- return Objects.hash(failOnMissingField, ignoreParseErrors,
resultTypeInfo);
+ return Objects.hash(failOnMissingField, ignoreParseErrors,
resultTypeInfo, timestampFormat);
}
//
-------------------------------------------------------------------------------------
@@ -193,7 +200,6 @@ public class JsonRowDataDeserializationSchema implements
DeserializationSchema<R
return this::convertToDate;
case TIME_WITHOUT_TIME_ZONE:
return this::convertToTime;
- case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return this::convertToTimestamp;
case FLOAT:
@@ -272,37 +278,25 @@ public class JsonRowDataDeserializationSchema implements
DeserializationSchema<R
}
private int convertToTime(JsonNode jsonNode) {
- // according to RFC 3339 every full-time must have a timezone;
- // until we have full timezone support, we only support UTC;
- // users can parse their time as string as a workaround
- TemporalAccessor parsedTime =
RFC3339_TIME_FORMAT.parse(jsonNode.asText());
-
- ZoneOffset zoneOffset =
parsedTime.query(TemporalQueries.offset());
+ TemporalAccessor parsedTime =
SQL_TIME_FORMAT.parse(jsonNode.asText());
LocalTime localTime =
parsedTime.query(TemporalQueries.localTime());
- if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0 ||
localTime.getNano() != 0) {
- throw new JsonParseException(
- "Invalid time format. Only a time in UTC
timezone without milliseconds is supported yet.");
- }
-
// get number of milliseconds of the day
return localTime.toSecondOfDay() * 1000;
}
private TimestampData convertToTimestamp(JsonNode jsonNode) {
- // according to RFC 3339 every date-time must have a timezone;
- // until we have full timezone support, we only support UTC;
- // users can parse their time as string as a workaround
- TemporalAccessor parsedTimestamp =
RFC3339_TIMESTAMP_FORMAT.parse(jsonNode.asText());
-
- ZoneOffset zoneOffset =
parsedTimestamp.query(TemporalQueries.offset());
-
- if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0) {
- throw new JsonParseException(
- "Invalid timestamp format. Only a timestamp in
UTC timezone is supported yet. " +
- "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ TemporalAccessor parsedTimestamp;
+ switch (timestampFormat){
+ case SQL:
+ parsedTimestamp =
SQL_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+ break;
+ case ISO_8601:
+ parsedTimestamp =
ISO8601_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+ break;
+ default:
+ throw new
TableException(String.format("Unsupported timestamp format '%s'. Validator
should have checked that.", timestampFormat));
}
-
LocalTime localTime =
parsedTimestamp.query(TemporalQueries.localTime());
LocalDate localDate =
parsedTimestamp.query(TemporalQueries.localDate());
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index 263e282..89b3b87 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -20,6 +20,7 @@ package org.apache.flink.formats.json;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.MapData;
@@ -45,8 +46,9 @@ import java.util.Arrays;
import java.util.Objects;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
-import static
org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
-import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
+import static
org.apache.flink.formats.json.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.SQL_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.SQL_TIME_FORMAT;
/**
* Serialization schema that serializes an object of Flink internal data
structure into a JSON bytes.
@@ -72,8 +74,12 @@ public class JsonRowDataSerializationSchema implements
SerializationSchema<RowDa
/** Reusable object node. */
private transient ObjectNode node;
- public JsonRowDataSerializationSchema(RowType rowType) {
+ /** Timestamp format specification which is used to parse timestamp. */
+ private final TimestampFormat timestampFormat;
+
+ public JsonRowDataSerializationSchema(RowType rowType, TimestampFormat
timestampFormat) {
this.rowType = rowType;
+ this.timestampFormat = timestampFormat;
this.runtimeConverter = createConverter(rowType);
}
@@ -101,12 +107,12 @@ public class JsonRowDataSerializationSchema implements
SerializationSchema<RowDa
return false;
}
JsonRowDataSerializationSchema that =
(JsonRowDataSerializationSchema) o;
- return rowType.equals(that.rowType);
+ return rowType.equals(that.rowType) &&
timestampFormat.equals(timestampFormat);
}
@Override
public int hashCode() {
- return Objects.hash(rowType);
+ return Objects.hash(rowType, timestampFormat);
}
//
--------------------------------------------------------------------------------
@@ -162,7 +168,6 @@ public class JsonRowDataSerializationSchema implements
SerializationSchema<RowDa
return createDateConverter();
case TIME_WITHOUT_TIME_ZONE:
return createTimeConverter();
- case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return createTimestampConverter();
case DECIMAL:
@@ -199,16 +204,27 @@ public class JsonRowDataSerializationSchema implements
SerializationSchema<RowDa
return (mapper, reuse, value) -> {
int millisecond = (int) value;
LocalTime time = LocalTime.ofSecondOfDay(millisecond /
1000L);
- return
mapper.getNodeFactory().textNode(RFC3339_TIME_FORMAT.format(time));
+ return
mapper.getNodeFactory().textNode(SQL_TIME_FORMAT.format(time));
};
}
private SerializationRuntimeConverter createTimestampConverter() {
- return (mapper, reuse, value) -> {
- TimestampData timestamp = (TimestampData) value;
- return mapper.getNodeFactory()
-
.textNode(RFC3339_TIMESTAMP_FORMAT.format(timestamp.toLocalDateTime()));
- };
+ switch (timestampFormat){
+ case ISO_8601:
+ return (mapper, reuse, value) -> {
+ TimestampData timestamp =
(TimestampData) value;
+ return mapper.getNodeFactory()
+
.textNode(ISO8601_TIMESTAMP_FORMAT.format(timestamp.toLocalDateTime()));
+ };
+ case SQL:
+ return (mapper, reuse, value) -> {
+ TimestampData timestamp =
(TimestampData) value;
+ return mapper.getNodeFactory()
+
.textNode(SQL_TIMESTAMP_FORMAT.format(timestamp.toLocalDateTime()));
+ };
+ default:
+ throw new TableException("Unsupported timestamp
format. Validator should have checked that.");
+ }
}
private SerializationRuntimeConverter createArrayConverter(ArrayType
type) {
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java
index c946c5d..693781b 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java
@@ -23,7 +23,7 @@ import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
/**
- * Time formats respecting the RFC3339 specification.
+ * Time formats and timestamp formats respecting the RFC3339 specification,
ISO-8601 specification and SQL specification.
*/
class TimeFormats {
@@ -41,6 +41,22 @@ class TimeFormats {
.append(RFC3339_TIME_FORMAT)
.toFormatter();
+ /** Formatter for ISO8601 string representation of a timestamp value
(without UTC timezone). */
+ static final DateTimeFormatter ISO8601_TIMESTAMP_FORMAT =
DateTimeFormatter.ISO_LOCAL_DATE_TIME;
+
+ /** Formatter for SQL string representation of a time value. */
+ static final DateTimeFormatter SQL_TIME_FORMAT = new
DateTimeFormatterBuilder()
+ .appendPattern("HH:mm:ss")
+ .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
+ .toFormatter();
+
+ /** Formatter for SQL string representation of a timestamp value
(without UTC timezone). */
+ static final DateTimeFormatter SQL_TIMESTAMP_FORMAT = new
DateTimeFormatterBuilder()
+ .append(DateTimeFormatter.ISO_LOCAL_DATE)
+ .appendLiteral(' ')
+ .append(SQL_TIME_FORMAT)
+ .toFormatter();
+
private TimeFormats() {
}
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimestampFormat.java
similarity index 52%
copy from
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
copy to
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimestampFormat.java
index dca8c16..0c31fd4 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimestampFormat.java
@@ -18,24 +18,18 @@
package org.apache.flink.formats.json;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.annotation.Internal;
/**
- * This class holds configuration constants used by json format.
+ * Timestamp format Enums.
*/
-public class JsonOptions {
+@Internal
+public enum TimestampFormat {
+ /** Options to specify timestamp format. It will parse timestamp in
"yyyy-MM-dd HH:mm:ss.s{precision}" format
+ * and output timestamp in the same format*/
+ SQL,
- public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD =
ConfigOptions
- .key("fail-on-missing-field")
- .booleanType()
- .defaultValue(false)
- .withDescription("Optional flag to specify whether to
fail if a field is missing or not, false by default");
-
- public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
ConfigOptions
- .key("ignore-parse-errors")
- .booleanType()
- .defaultValue(false)
- .withDescription("Optional flag to skip fields and rows
with parse errors instead of failing;\n"
- + "fields are set to null in case of
errors, false by default");
+ /** Options to specify timestamp format. It will parse timestamp in
"yyyy-MM-ddTHH:mm:ss.s{precision}" format
+ * and output timestamp in the same format*/
+ ISO_8601
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
index 4c03d04..761a0f4 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.json.canal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.GenericRowData;
@@ -70,7 +71,8 @@ public final class CanalJsonDeserializationSchema implements
DeserializationSche
public CanalJsonDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
- boolean ignoreParseErrors) {
+ boolean ignoreParseErrors,
+ TimestampFormat timestampFormatOption) {
this.resultTypeInfo = resultTypeInfo;
this.ignoreParseErrors = ignoreParseErrors;
this.fieldCount = rowType.getFieldCount();
@@ -79,7 +81,8 @@ public final class CanalJsonDeserializationSchema implements
DeserializationSche
// the result type is never used, so it's fine to pass
in Canal's result type
resultTypeInfo,
false, // ignoreParseErrors already contains the
functionality of failOnMissingField
- ignoreParseErrors);
+ ignoreParseErrors,
+ timestampFormatOption);
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
index fba98bf..cedff80 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
@@ -22,8 +22,9 @@ import
org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.json.JsonOptions;
+import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
@@ -48,12 +49,9 @@ public class CanalJsonFormatFactory implements
DeserializationFormatFactory, Ser
public static final String IDENTIFIER = "canal-json";
- public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
ConfigOptions
- .key("ignore-parse-errors")
- .booleanType()
- .defaultValue(false)
- .withDescription("Optional flag to skip fields and rows with
parse errors instead of failing, " +
- "fields are set to null in case of errors. Default is
false.");
+ public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
JsonOptions.IGNORE_PARSE_ERRORS;
+
+ public static final ConfigOption<String> TIMESTAMP_FORMAT =
JsonOptions.TIMESTAMP_FORMAT;
@SuppressWarnings("unchecked")
@Override
@@ -62,6 +60,7 @@ public class CanalJsonFormatFactory implements
DeserializationFormatFactory, Ser
ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
final boolean ignoreParseErrors =
formatOptions.get(IGNORE_PARSE_ERRORS);
+ TimestampFormat timestampFormatOption =
JsonOptions.getTimestampFormat(formatOptions);
return new DecodingFormat<DeserializationSchema<RowData>>() {
@Override
@@ -73,7 +72,8 @@ public class CanalJsonFormatFactory implements
DeserializationFormatFactory, Ser
return new CanalJsonDeserializationSchema(
rowType,
rowDataTypeInfo,
- ignoreParseErrors);
+ ignoreParseErrors,
+ timestampFormatOption);
}
@Override
@@ -109,6 +109,7 @@ public class CanalJsonFormatFactory implements
DeserializationFormatFactory, Ser
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(IGNORE_PARSE_ERRORS);
+ options.add(TIMESTAMP_FORMAT);
return options;
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
index f0ae9d9..d1f1d96 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@@ -78,7 +79,8 @@ public final class DebeziumJsonDeserializationSchema
implements DeserializationS
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
boolean schemaInclude,
- boolean ignoreParseErrors) {
+ boolean ignoreParseErrors,
+ TimestampFormat timestampFormatOption) {
this.resultTypeInfo = resultTypeInfo;
this.schemaInclude = schemaInclude;
this.ignoreParseErrors = ignoreParseErrors;
@@ -87,7 +89,8 @@ public final class DebeziumJsonDeserializationSchema
implements DeserializationS
// the result type is never used, so it's fine to pass
in Debezium's result type
resultTypeInfo,
false, // ignoreParseErrors already contains the
functionality of failOnMissingField
- ignoreParseErrors);
+ ignoreParseErrors,
+ timestampFormatOption);
}
@Override
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
index 3458014..17c6ab2 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.json.JsonOptions;
+import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
@@ -57,12 +59,9 @@ public class DebeziumJsonFormatFactory implements
DeserializationFormatFactory,
"This option indicates the Debezium JSON data include
the schema in the message or not. " +
"Default is false.");
- public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
ConfigOptions
- .key("ignore-parse-errors")
- .booleanType()
- .defaultValue(false)
- .withDescription("Optional flag to skip fields and rows with
parse errors instead of failing, " +
- "fields are set to null in case of errors. Default is
false.");
+ public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
JsonOptions.IGNORE_PARSE_ERRORS;
+
+ public static final ConfigOption<String> TIMESTAMP_FORMAT =
JsonOptions.TIMESTAMP_FORMAT;
@SuppressWarnings("unchecked")
@Override
@@ -72,6 +71,7 @@ public class DebeziumJsonFormatFactory implements
DeserializationFormatFactory,
FactoryUtil.validateFactoryOptions(this, formatOptions);
final boolean schemaInclude = formatOptions.get(SCHEMA_INCLUDE);
final boolean ignoreParseErrors =
formatOptions.get(IGNORE_PARSE_ERRORS);
+ TimestampFormat timestampFormatOption =
JsonOptions.getTimestampFormat(formatOptions);
return new DecodingFormat<DeserializationSchema<RowData>>() {
@Override
@@ -84,7 +84,8 @@ public class DebeziumJsonFormatFactory implements
DeserializationFormatFactory,
rowType,
rowDataTypeInfo,
schemaInclude,
- ignoreParseErrors);
+ ignoreParseErrors,
+ timestampFormatOption);
}
@Override
@@ -121,6 +122,7 @@ public class DebeziumJsonFormatFactory implements
DeserializationFormatFactory,
Set<ConfigOption<?>> options = new HashSet<>();
options.add(SCHEMA_INCLUDE);
options.add(IGNORE_PARSE_ERRORS);
+ options.add(TIMESTAMP_FORMAT);
return options;
}
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
index c12eecf..568563e 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
@@ -91,6 +91,25 @@ public class JsonFormatFactoryTest extends TestLogger {
testSchemaDeserializationSchema(tableOptions);
}
+ @Test
+ public void testInvalidOptionForTimestampFormat() {
+ final Map<String, String> tableOptions = getModifyOptions(
+ options ->
options.put("json.timestamp-format.standard", "test"));
+
+ thrown.expect(ValidationException.class);
+ thrown.expect(containsCause(new
ValidationException("Unsupported value 'test' for timestamp-format.standard.
Supported values are [SQL, ISO-8601].")));
+ testSchemaDeserializationSchema(tableOptions);
+ }
+
+ @Test
+ public void testLowerCaseOptionForTimestampFormat() {
+ final Map<String, String> tableOptions = getModifyOptions(
+ options ->
options.put("json.timestamp-format.standard", "iso-8601"));
+
+ thrown.expect(ValidationException.class);
+ thrown.expect(containsCause(new
ValidationException("Unsupported value 'iso-8601' for
timestamp-format.standard. Supported values are [SQL, ISO-8601].")));
+ testSchemaDeserializationSchema(tableOptions);
+ }
//
------------------------------------------------------------------------
// Utilities
//
------------------------------------------------------------------------
@@ -101,7 +120,8 @@ public class JsonFormatFactoryTest extends TestLogger {
ROW_TYPE,
new RowDataTypeInfo(ROW_TYPE),
false,
- true);
+ true,
+ TimestampFormat.ISO_8601);
final DynamicTableSource actualSource =
createTableSource(options);
assert actualSource instanceof
TestDynamicTableFactory.DynamicTableSourceMock;
@@ -117,7 +137,8 @@ public class JsonFormatFactoryTest extends TestLogger {
}
private void testSchemaSerializationSchema(Map<String, String> options)
{
- final JsonRowDataSerializationSchema expectedSer = new
JsonRowDataSerializationSchema(ROW_TYPE);
+ final JsonRowDataSerializationSchema expectedSer = new
JsonRowDataSerializationSchema(ROW_TYPE,
+ TimestampFormat.ISO_8601);
final DynamicTableSink actualSink = createTableSink(options);
assert actualSink instanceof
TestDynamicTableFactory.DynamicTableSinkMock;
@@ -152,6 +173,7 @@ public class JsonFormatFactoryTest extends TestLogger {
options.put("format", JsonFormatFactory.IDENTIFIER);
options.put("json.fail-on-missing-field", "false");
options.put("json.ignore-parse-errors", "true");
+ options.put("json.timestamp-format.standard", "ISO-8601");
return options;
}
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index dcfae9c..fedfbea 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -114,9 +114,9 @@ public class JsonRowDataSerDeSchemaTest {
root.put("decimal", decimal);
root.set("doubles", doubleNode);
root.put("date", "1990-10-14");
- root.put("time", "12:12:43Z");
- root.put("timestamp3", "1990-10-14T12:12:43.123Z");
- root.put("timestamp9", "1990-10-14T12:12:43.123456789Z");
+ root.put("time", "12:12:43");
+ root.put("timestamp3", "1990-10-14T12:12:43.123");
+ root.put("timestamp9", "1990-10-14T12:12:43.123456789");
root.putObject("map").put("flink", 123);
root.putObject("map2map").putObject("inner_map").put("key",
234);
@@ -143,7 +143,7 @@ public class JsonRowDataSerDeSchemaTest {
RowDataTypeInfo resultTypeInfo = new RowDataTypeInfo(schema);
JsonRowDataDeserializationSchema deserializationSchema = new
JsonRowDataDeserializationSchema(
- schema, resultTypeInfo, false, false);
+ schema, resultTypeInfo, false, false,
TimestampFormat.ISO_8601);
Row expected = new Row(16);
expected.setField(0, true);
@@ -168,7 +168,7 @@ public class JsonRowDataSerDeSchemaTest {
assertEquals(expected, actual);
// test serialization
- JsonRowDataSerializationSchema serializationSchema = new
JsonRowDataSerializationSchema(schema);
+ JsonRowDataSerializationSchema serializationSchema = new
JsonRowDataSerializationSchema(schema, TimestampFormat.ISO_8601);
byte[] actualBytes = serializationSchema.serialize(rowData);
assertEquals(new String(serializedJson), new
String(actualBytes));
@@ -211,7 +211,7 @@ public class JsonRowDataSerDeSchemaTest {
RowType rowType = (RowType) dataType.getLogicalType();
JsonRowDataDeserializationSchema deserializationSchema = new
JsonRowDataDeserializationSchema(
- rowType, new RowDataTypeInfo(rowType), false, false);
+ rowType, new RowDataTypeInfo(rowType), false, false,
TimestampFormat.ISO_8601);
Row expected = new Row(7);
expected.setField(0, bool);
@@ -236,8 +236,8 @@ public class JsonRowDataSerDeSchemaTest {
).getLogicalType();
JsonRowDataDeserializationSchema deserializationSchema = new
JsonRowDataDeserializationSchema(
- rowType, new RowDataTypeInfo(rowType), false, false);
- JsonRowDataSerializationSchema serializationSchema = new
JsonRowDataSerializationSchema(rowType);
+ rowType, new RowDataTypeInfo(rowType), false, false,
TimestampFormat.ISO_8601);
+ JsonRowDataSerializationSchema serializationSchema = new
JsonRowDataSerializationSchema(rowType, TimestampFormat.ISO_8601);
ObjectMapper objectMapper = new ObjectMapper();
@@ -290,8 +290,8 @@ public class JsonRowDataSerDeSchemaTest {
).getLogicalType();
JsonRowDataDeserializationSchema deserializationSchema = new
JsonRowDataDeserializationSchema(
- rowType, new RowDataTypeInfo(rowType), false, true);
- JsonRowDataSerializationSchema serializationSchema = new
JsonRowDataSerializationSchema(rowType);
+ rowType, new RowDataTypeInfo(rowType), false, true,
TimestampFormat.ISO_8601);
+ JsonRowDataSerializationSchema serializationSchema = new
JsonRowDataSerializationSchema(rowType, TimestampFormat.ISO_8601);
for (int i = 0; i < jsons.length; i++) {
String json = jsons[i];
@@ -315,7 +315,7 @@ public class JsonRowDataSerDeSchemaTest {
// pass on missing field
JsonRowDataDeserializationSchema deserializationSchema = new
JsonRowDataDeserializationSchema(
- schema, new RowDataTypeInfo(schema), false, false);
+ schema, new RowDataTypeInfo(schema), false, false,
TimestampFormat.ISO_8601);
Row expected = new Row(1);
Row actual =
convertToExternal(deserializationSchema.deserialize(serializedJson), dataType);
@@ -323,7 +323,7 @@ public class JsonRowDataSerDeSchemaTest {
// fail on missing field
deserializationSchema = deserializationSchema = new
JsonRowDataDeserializationSchema(
- schema, new RowDataTypeInfo(schema), true, false);
+ schema, new RowDataTypeInfo(schema), true, false,
TimestampFormat.ISO_8601);
thrown.expect(IOException.class);
thrown.expectMessage("Failed to deserialize JSON
'{\"id\":123123123}'");
@@ -331,7 +331,7 @@ public class JsonRowDataSerDeSchemaTest {
// ignore on parse error
deserializationSchema = new JsonRowDataDeserializationSchema(
- schema, new RowDataTypeInfo(schema), false, true);
+ schema, new RowDataTypeInfo(schema), false, true,
TimestampFormat.ISO_8601);
actual =
convertToExternal(deserializationSchema.deserialize(serializedJson), dataType);
assertEquals(expected, actual);
@@ -340,7 +340,29 @@ public class JsonRowDataSerDeSchemaTest {
// failOnMissingField and ignoreParseErrors both enabled
//noinspection ConstantConditions
new JsonRowDataDeserializationSchema(
- schema, new RowDataTypeInfo(schema), true, true);
+ schema, new RowDataTypeInfo(schema), true, true,
TimestampFormat.ISO_8601);
+ }
+
+ @Test
+ public void testSerDeSQLTimestampFormat() throws Exception{
+ RowType rowType = (RowType) ROW(
+ FIELD("timestamp3", TIMESTAMP(3)),
+ FIELD("timestamp9", TIMESTAMP(9))
+ ).getLogicalType();
+
+ JsonRowDataDeserializationSchema deserializationSchema = new
JsonRowDataDeserializationSchema(
+ rowType, new RowDataTypeInfo(rowType), false, false,
TimestampFormat.SQL);
+ JsonRowDataSerializationSchema serializationSchema = new
JsonRowDataSerializationSchema(rowType, TimestampFormat.SQL);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ ObjectNode root = objectMapper.createObjectNode();
+ root.put("timestamp3", "1990-10-14 12:12:43.123");
+ root.put("timestamp9", "1990-10-14 12:12:43.123456789");
+ byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ RowData rowData =
deserializationSchema.deserialize(serializedJson);
+ byte[] actual = serializationSchema.serialize(rowData);
+ assertEquals(new String(serializedJson), new String(actual));
}
@Test
@@ -356,7 +378,8 @@ public class JsonRowDataSerDeSchemaTest {
private void testIgnoreParseErrors(TestSpec spec) throws Exception {
// the parsing field should be null and no exception is thrown
JsonRowDataDeserializationSchema ignoreErrorsSchema = new
JsonRowDataDeserializationSchema(
- spec.rowType, new RowDataTypeInfo(spec.rowType),
false, true);
+ spec.rowType, new RowDataTypeInfo(spec.rowType),
false, true,
+ TimestampFormat.ISO_8601);
Row expected;
if (spec.expected != null) {
expected = spec.expected;
@@ -373,7 +396,8 @@ public class JsonRowDataSerDeSchemaTest {
private void testParseErrors(TestSpec spec) throws Exception {
// expect exception if parse error is not ignored
JsonRowDataDeserializationSchema failingSchema = new
JsonRowDataDeserializationSchema(
- spec.rowType, new RowDataTypeInfo(spec.rowType),
false, false);
+ spec.rowType, new RowDataTypeInfo(spec.rowType),
false, false,
+ spec.timestampFormat);
thrown.expectMessage(spec.errorMessage);
failingSchema.deserialize(spec.json.getBytes());
@@ -421,6 +445,16 @@ public class JsonRowDataSerDeSchemaTest {
.expectErrorMessage("Failed to deserialize JSON
'{\"id\":\"18:00:243\"}'"),
TestSpec
+ .json("{\"id\":\"18:00:243\"}")
+ .rowType(ROW(FIELD("id", TIME())))
+ .expectErrorMessage("Failed to deserialize JSON
'{\"id\":\"18:00:243\"}'"),
+
+ TestSpec
+ .json("{\"id\":\"20191112\"}")
+ .rowType(ROW(FIELD("id", DATE())))
+ .expectErrorMessage("Failed to deserialize JSON
'{\"id\":\"20191112\"}'"),
+
+ TestSpec
.json("{\"id\":\"20191112\"}")
.rowType(ROW(FIELD("id", DATE())))
.expectErrorMessage("Failed to deserialize JSON
'{\"id\":\"20191112\"}'"),
@@ -428,9 +462,26 @@ public class JsonRowDataSerDeSchemaTest {
TestSpec
.json("{\"id\":\"2019-11-12 18:00:12\"}")
.rowType(ROW(FIELD("id", TIMESTAMP(0))))
+ .timestampFormat(TimestampFormat.ISO_8601)
.expectErrorMessage("Failed to deserialize JSON
'{\"id\":\"2019-11-12 18:00:12\"}'"),
TestSpec
+ .json("{\"id\":\"2019-11-12T18:00:12\"}")
+ .rowType(ROW(FIELD("id", TIMESTAMP(0))))
+ .expectErrorMessage("Failed to deserialize JSON
'{\"id\":\"2019-11-12T18:00:12\"}'"),
+
+ TestSpec
+ .json("{\"id\":\"2019-11-12T18:00:12Z\"}")
+ .rowType(ROW(FIELD("id", TIMESTAMP(0))))
+ .expectErrorMessage("Failed to deserialize JSON
'{\"id\":\"2019-11-12T18:00:12Z\"}'"),
+
+ TestSpec
+ .json("{\"id\":\"2019-11-12T18:00:12Z\"}")
+ .rowType(ROW(FIELD("id", TIMESTAMP(0))))
+ .timestampFormat(TimestampFormat.ISO_8601)
+ .expectErrorMessage("Failed to deserialize JSON
'{\"id\":\"2019-11-12T18:00:12Z\"}'"),
+
+ TestSpec
.json("{\"id\":\"abc\"}")
.rowType(ROW(FIELD("id", DECIMAL(10, 3))))
.expectErrorMessage("Failed to deserialize JSON
'{\"id\":\"abc\"}'"),
@@ -471,6 +522,7 @@ public class JsonRowDataSerDeSchemaTest {
private static class TestSpec {
private final String json;
private RowType rowType;
+ private TimestampFormat timestampFormat = TimestampFormat.SQL;
private Row expected;
private String errorMessage;
@@ -496,5 +548,10 @@ public class JsonRowDataSerDeSchemaTest {
this.errorMessage = errorMessage;
return this;
}
+
+ TestSpec timestampFormat(TimestampFormat timestampFormat){
+ this.timestampFormat = timestampFormat;
+ return this;
+ }
}
}
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchemaTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchemaTest.java
index 02f055e..36099ef 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchemaTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchemaTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.formats.json.canal;
+import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.types.logical.RowType;
@@ -65,7 +66,8 @@ public class CanalJsonDeserializationSchemaTest {
CanalJsonDeserializationSchema deserializationSchema = new
CanalJsonDeserializationSchema(
SCHEMA,
new RowDataTypeInfo(SCHEMA),
- false);
+ false,
+ TimestampFormat.ISO_8601);
SimpleCollector collector = new SimpleCollector();
for (String line : lines) {
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
index 9d64079..102ba88 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.formats.json.canal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTableImpl;
@@ -65,7 +66,8 @@ public class CanalJsonFormatFactoryTest extends TestLogger {
final CanalJsonDeserializationSchema expectedDeser = new
CanalJsonDeserializationSchema(
ROW_TYPE,
new RowDataTypeInfo(ROW_TYPE),
- true);
+ true,
+ TimestampFormat.ISO_8601);
final Map<String, String> options = getAllOptions();
@@ -120,6 +122,7 @@ public class CanalJsonFormatFactoryTest extends TestLogger {
options.put("format", "canal-json");
options.put("canal-json.ignore-parse-errors", "true");
+ options.put("canal-json.timestamp-format.standard", "ISO-8601");
return options;
}
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchemaTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchemaTest.java
index ff7a343..d8114a8 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchemaTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchemaTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.formats.json.debezium;
+import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.types.logical.RowType;
@@ -75,7 +76,8 @@ public class DebeziumJsonDeserializationSchemaTest {
SCHEMA,
new RowDataTypeInfo(SCHEMA),
schemaInclude,
- false);
+ false,
+ TimestampFormat.ISO_8601);
SimpleCollector collector = new SimpleCollector();
for (String line : lines) {
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
index 59dd3dd..e19581e 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.formats.json.debezium;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTableImpl;
@@ -66,7 +67,8 @@ public class DebeziumJsonFormatFactoryTest extends TestLogger
{
ROW_TYPE,
new RowDataTypeInfo(ROW_TYPE),
true,
- true);
+ true,
+ TimestampFormat.ISO_8601);
final Map<String, String> options = getAllOptions();
@@ -122,6 +124,7 @@ public class DebeziumJsonFormatFactoryTest extends
TestLogger {
options.put("format", "debezium-json");
options.put("debezium-json.ignore-parse-errors", "true");
options.put("debezium-json.schema-include", "true");
+ options.put("debezium-json.timestamp-format.standard",
"ISO-8601");
return options;
}