This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 258a01e  [FLINK-18299][json] Fix the non SQL standard timestamp format 
in JSON format
258a01e is described below

commit 258a01e7189372ddbb621f8b57d18257a8bab0e5
Author: Shengkai <[email protected]>
AuthorDate: Wed Jun 17 21:32:10 2020 +0800

    [FLINK-18299][json] Fix the non SQL standard timestamp format in JSON format
    
    
    The current timestamp format in JSON format is not SQL standard which uses 
RFC-3339. This commit changes the default behavior to parse/generate timestamp 
using SQL standard. Besides, it introduces an option 
"json.timestamp-format.standard" to have the ability to fallback to ISO 
standard.
    
    This closes #12661
---
 docs/dev/table/connectors/formats/canal.md         | 14 +++-
 docs/dev/table/connectors/formats/canal.zh.md      | 14 +++-
 docs/dev/table/connectors/formats/debezium.md      | 14 +++-
 docs/dev/table/connectors/formats/debezium.zh.md   | 14 +++-
 docs/dev/table/connectors/formats/json.md          | 12 +++
 docs/dev/table/connectors/formats/json.zh.md       | 12 +++
 .../table/Elasticsearch6DynamicSinkITCase.java     | 12 +--
 .../table/Elasticsearch7DynamicSinkITCase.java     | 12 +--
 .../tests/util/kafka/SQLClientKafkaITCase.java     | 16 ++--
 .../formats/json/JsonFileSystemFormatFactory.java  |  9 ++-
 .../flink/formats/json/JsonFormatFactory.java      | 17 ++++-
 .../org/apache/flink/formats/json/JsonOptions.java | 43 +++++++++++
 .../json/JsonRowDataDeserializationSchema.java     | 54 ++++++-------
 .../json/JsonRowDataSerializationSchema.java       | 40 +++++++---
 .../org/apache/flink/formats/json/TimeFormats.java | 18 ++++-
 .../{JsonOptions.java => TimestampFormat.java}     | 26 +++----
 .../json/canal/CanalJsonDeserializationSchema.java |  7 +-
 .../formats/json/canal/CanalJsonFormatFactory.java | 17 +++--
 .../DebeziumJsonDeserializationSchema.java         |  7 +-
 .../json/debezium/DebeziumJsonFormatFactory.java   | 16 ++--
 .../flink/formats/json/JsonFormatFactoryTest.java  | 26 ++++++-
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 89 ++++++++++++++++++----
 .../canal/CanalJsonDeserializationSchemaTest.java  |  4 +-
 .../json/canal/CanalJsonFormatFactoryTest.java     |  5 +-
 .../DebeziumJsonDeserializationSchemaTest.java     |  4 +-
 .../debezium/DebeziumJsonFormatFactoryTest.java    |  5 +-
 26 files changed, 379 insertions(+), 128 deletions(-)

diff --git a/docs/dev/table/connectors/formats/canal.md 
b/docs/dev/table/connectors/formats/canal.md
index 43b69da..6f9ea44 100644
--- a/docs/dev/table/connectors/formats/canal.md
+++ b/docs/dev/table/connectors/formats/canal.md
@@ -162,13 +162,25 @@ Format Options
       <td>Specify what format to use, here should be 
<code>'canal-json'</code>.</td>
     </tr>
     <tr>
-      <td><h5>json.ignore-parse-errors</h5></td>
+      <td><h5>canal-json.ignore-parse-errors</h5></td>
       <td>optional</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>Skip fields and rows with parse errors instead of failing.
       Fields are set to null in case of errors.</td>
     </tr>
+    <tr>
+      <td><h5>canal-json.timestamp-format.standard</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;"><code>'SQL'</code></td>
+      <td>String</td>
+      <td>Specify the input and output timestamp format. Currently supported 
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+      <ul>
+        <li>Option <code>'SQL'</code> will parse input timestamp in 
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and 
output timestamp in the same format.</li>
+        <li>Option <code>'ISO-8601'</code>will parse input timestamp in 
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and 
output timestamp in the same format.</li>
+      </ul>
+      </td>
+      </tr>
     </tbody>
 </table>
 
diff --git a/docs/dev/table/connectors/formats/canal.zh.md 
b/docs/dev/table/connectors/formats/canal.zh.md
index ff5df92..2c36128 100644
--- a/docs/dev/table/connectors/formats/canal.zh.md
+++ b/docs/dev/table/connectors/formats/canal.zh.md
@@ -162,13 +162,25 @@ Format Options
       <td>Specify what format to use, here should be 
<code>'canal-json'</code>.</td>
     </tr>
     <tr>
-      <td><h5>json.ignore-parse-errors</h5></td>
+      <td><h5>canal-json.ignore-parse-errors</h5></td>
       <td>optional</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>Skip fields and rows with parse errors instead of failing.
       Fields are set to null in case of errors.</td>
     </tr>
+    <tr>
+       <td><h5>canal-json.timestamp-format.standard</h5></td>
+       <td>optional</td>
+       <td style="word-wrap: break-word;"><code>'SQL'</code></td>
+       <td>String</td>
+       <td>Specify the input and output timestamp format. Currently supported 
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+       <ul>
+         <li>Option <code>'SQL'</code> will parse input timestamp in 
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and 
output timestamp in the same format.</li>
+         <li>Option <code>'ISO-8601'</code>will parse input timestamp in 
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and 
output timestamp in the same format.</li>
+       </ul>
+       </td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/dev/table/connectors/formats/debezium.md 
b/docs/dev/table/connectors/formats/debezium.md
index 159f6d6..98acf5f 100644
--- a/docs/dev/table/connectors/formats/debezium.md
+++ b/docs/dev/table/connectors/formats/debezium.md
@@ -178,13 +178,25 @@ Format Options
           This option indicates whether the Debezium JSON message includes the 
schema or not. </td>
     </tr>
     <tr>
-      <td><h5>json.ignore-parse-errors</h5></td>
+      <td><h5>debezium-json.ignore-parse-errors</h5></td>
       <td>optional</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>Skip fields and rows with parse errors instead of failing.
       Fields are set to null in case of errors.</td>
     </tr>
+    <tr>
+       <td><h5>debezium-json.timestamp-format.standard</h5></td>
+       <td>optional</td>
+       <td style="word-wrap: break-word;"><code>'SQL'</code></td>
+       <td>String</td>
+       <td>Specify the input and output timestamp format. Currently supported 
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+       <ul>
+         <li>Option <code>'SQL'</code> will parse input timestamp in 
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and 
output timestamp in the same format.</li>
+         <li>Option <code>'ISO-8601'</code>will parse input timestamp in 
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and 
output timestamp in the same format.</li>
+       </ul>
+       </td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/dev/table/connectors/formats/debezium.zh.md 
b/docs/dev/table/connectors/formats/debezium.zh.md
index b957bbc..794b4c35 100644
--- a/docs/dev/table/connectors/formats/debezium.zh.md
+++ b/docs/dev/table/connectors/formats/debezium.zh.md
@@ -178,13 +178,25 @@ Format Options
           This option indicates whether the Debezium JSON message includes the 
schema or not. </td>
     </tr>
     <tr>
-      <td><h5>json.ignore-parse-errors</h5></td>
+      <td><h5>debezium-json.ignore-parse-errors</h5></td>
       <td>optional</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>Skip fields and rows with parse errors instead of failing.
       Fields are set to null in case of errors.</td>
     </tr>
+    <tr>
+      <td><h5>debezium-json.timestamp-format.standard</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;"><code>'SQL'</code></td>
+      <td>String</td>
+      <td>Specify the input and output timestamp format. Currently supported 
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+      <ul>
+        <li>Option <code>'SQL'</code> will parse input timestamp in 
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and 
output timestamp in the same format.</li>
+        <li>Option <code>'ISO-8601'</code>will parse input timestamp in 
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and 
output timestamp in the same format.</li>
+      </ul>
+      </td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/dev/table/connectors/formats/json.md 
b/docs/dev/table/connectors/formats/json.md
index e4a0df9..181f48e 100644
--- a/docs/dev/table/connectors/formats/json.md
+++ b/docs/dev/table/connectors/formats/json.md
@@ -103,6 +103,18 @@ Format Options
       <td>Skip fields and rows with parse errors instead of failing.
       Fields are set to null in case of errors.</td>
     </tr>
+    <tr>
+      <td><h5>json.timestamp-format.standard</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;"><code>'SQL'</code></td>
+      <td>String</td>
+      <td>Specify the input and output timestamp format. Currently supported 
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+      <ul>
+        <li>Option <code>'SQL'</code> will parse input timestamp in 
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and 
output timestamp in the same format.</li>
+        <li>Option <code>'ISO-8601'</code>will parse input timestamp in 
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and 
output timestamp in the same format.</li>
+      </ul>
+      </td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/dev/table/connectors/formats/json.zh.md 
b/docs/dev/table/connectors/formats/json.zh.md
index e4a0df9..181f48e 100644
--- a/docs/dev/table/connectors/formats/json.zh.md
+++ b/docs/dev/table/connectors/formats/json.zh.md
@@ -103,6 +103,18 @@ Format Options
       <td>Skip fields and rows with parse errors instead of failing.
       Fields are set to null in case of errors.</td>
     </tr>
+    <tr>
+      <td><h5>json.timestamp-format.standard</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;"><code>'SQL'</code></td>
+      <td>String</td>
+      <td>Specify the input and output timestamp format. Currently supported 
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+      <ul>
+        <li>Option <code>'SQL'</code> will parse input timestamp in 
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and 
output timestamp in the same format.</li>
+        <li>Option <code>'ISO-8601'</code>will parse input timestamp in 
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and 
output timestamp in the same format.</li>
+      </ul>
+      </td>
+    </tr>
     </tbody>
 </table>
 
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
index 41a67f6..26cf90a 100644
--- 
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
@@ -107,12 +107,12 @@ public class Elasticsearch6DynamicSinkITCase {
                Map<String, Object> response = client.get(new GetRequest(index, 
myType, "1_2012-12-12T12:12:12")).actionGet().getSource();
                Map<Object, Object> expectedMap = new HashMap<>();
                expectedMap.put("a", 1);
-               expectedMap.put("b", "00:00:12Z");
+               expectedMap.put("b", "00:00:12");
                expectedMap.put("c", "ABCDE");
                expectedMap.put("d", 12.12d);
                expectedMap.put("e", 2);
                expectedMap.put("f", "2003-10-20");
-               expectedMap.put("g", "2012-12-12T12:12:12Z");
+               expectedMap.put("g", "2012-12-12 12:12:12");
                assertThat(response, equalTo(expectedMap));
        }
 
@@ -165,12 +165,12 @@ public class Elasticsearch6DynamicSinkITCase {
                        .getSource();
                Map<Object, Object> expectedMap = new HashMap<>();
                expectedMap.put("a", 1);
-               expectedMap.put("b", "00:00:12Z");
+               expectedMap.put("b", "00:00:12");
                expectedMap.put("c", "ABCDE");
                expectedMap.put("d", 12.12d);
                expectedMap.put("e", 2);
                expectedMap.put("f", "2003-10-20");
-               expectedMap.put("g", "2012-12-12T12:12:12Z");
+               expectedMap.put("g", "2012-12-12 12:12:12");
                assertThat(response, equalTo(expectedMap));
        }
 
@@ -238,12 +238,12 @@ public class Elasticsearch6DynamicSinkITCase {
                Map<String, Object> result = hits.getAt(0).getSourceAsMap();
                Map<Object, Object> expectedMap = new HashMap<>();
                expectedMap.put("a", 1);
-               expectedMap.put("b", "00:00:12Z");
+               expectedMap.put("b", "00:00:12");
                expectedMap.put("c", "ABCDE");
                expectedMap.put("d", 12.12d);
                expectedMap.put("e", 2);
                expectedMap.put("f", "2003-10-20");
-               expectedMap.put("g", "2012-12-12T12:12:12Z");
+               expectedMap.put("g", "2012-12-12 12:12:12");
                assertThat(result, equalTo(expectedMap));
        }
 
diff --git 
a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
index dbd1ff9..7f41eb7 100644
--- 
a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
@@ -105,12 +105,12 @@ public class Elasticsearch7DynamicSinkITCase {
                Map<String, Object> response = client.get(new GetRequest(index, 
"1_2012-12-12T12:12:12")).actionGet().getSource();
                Map<Object, Object> expectedMap = new HashMap<>();
                expectedMap.put("a", 1);
-               expectedMap.put("b", "00:00:12Z");
+               expectedMap.put("b", "00:00:12");
                expectedMap.put("c", "ABCDE");
                expectedMap.put("d", 12.12d);
                expectedMap.put("e", 2);
                expectedMap.put("f", "2003-10-20");
-               expectedMap.put("g", "2012-12-12T12:12:12Z");
+               expectedMap.put("g", "2012-12-12 12:12:12");
                assertThat(response, equalTo(expectedMap));
        }
 
@@ -159,12 +159,12 @@ public class Elasticsearch7DynamicSinkITCase {
                Map<String, Object> response = client.get(new GetRequest(index, 
"1_2012-12-12T12:12:12")).actionGet().getSource();
                Map<Object, Object> expectedMap = new HashMap<>();
                expectedMap.put("a", 1);
-               expectedMap.put("b", "00:00:12Z");
+               expectedMap.put("b", "00:00:12");
                expectedMap.put("c", "ABCDE");
                expectedMap.put("d", 12.12d);
                expectedMap.put("e", 2);
                expectedMap.put("f", "2003-10-20");
-               expectedMap.put("g", "2012-12-12T12:12:12Z");
+               expectedMap.put("g", "2012-12-12 12:12:12");
                assertThat(response, equalTo(expectedMap));
        }
 
@@ -230,12 +230,12 @@ public class Elasticsearch7DynamicSinkITCase {
                Map<String, Object> result = hits.getAt(0).getSourceAsMap();
                Map<Object, Object> expectedMap = new HashMap<>();
                expectedMap.put("a", 1);
-               expectedMap.put("b", "00:00:12Z");
+               expectedMap.put("b", "00:00:12");
                expectedMap.put("c", "ABCDE");
                expectedMap.put("d", 12.12d);
                expectedMap.put("e", 2);
                expectedMap.put("f", "2003-10-20");
-               expectedMap.put("g", "2012-12-12T12:12:12Z");
+               expectedMap.put("g", "2012-12-12 12:12:12");
                assertThat(result, equalTo(expectedMap));
        }
 
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
index 9d8d1a2..8671576 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
@@ -141,14 +141,14 @@ public class SQLClientKafkaITCase extends TestLogger {
                        String testAvroTopic = "test-avro-" + kafkaVersion + 
"-" + UUID.randomUUID().toString();
                        kafka.createTopic(1, 1, testJsonTopic);
                        String[] messages = new String[]{
-                                       "{\"rowtime\": 
\"2018-03-12T08:00:00Z\", \"user\": \"Alice\", \"event\": { \"type\": 
\"WARNING\", \"message\": \"This is a warning.\"}}",
-                                       "{\"rowtime\": 
\"2018-03-12T08:10:00Z\", \"user\": \"Alice\", \"event\": { \"type\": 
\"WARNING\", \"message\": \"This is a warning.\"}}",
-                                       "{\"rowtime\": 
\"2018-03-12T09:00:00Z\", \"user\": \"Bob\", \"event\": { \"type\": 
\"WARNING\", \"message\": \"This is another warning.\"}}",
-                                       "{\"rowtime\": 
\"2018-03-12T09:10:00Z\", \"user\": \"Alice\", \"event\": { \"type\": \"INFO\", 
\"message\": \"This is a info.\"}}",
-                                       "{\"rowtime\": 
\"2018-03-12T09:20:00Z\", \"user\": \"Steve\", \"event\": { \"type\": \"INFO\", 
\"message\": \"This is another info.\"}}",
-                                       "{\"rowtime\": 
\"2018-03-12T09:30:00Z\", \"user\": \"Steve\", \"event\": { \"type\": \"INFO\", 
\"message\": \"This is another info.\"}}",
-                                       "{\"rowtime\": 
\"2018-03-12T09:30:00Z\", \"user\": null, \"event\": { \"type\": \"WARNING\", 
\"message\": \"This is a bad message because the user is missing.\"}}",
-                                       "{\"rowtime\": 
\"2018-03-12T10:40:00Z\", \"user\": \"Bob\", \"event\": { \"type\": \"ERROR\", 
\"message\": \"This is an error.\"}}"
+                                       "{\"rowtime\": \"2018-03-12 08:00:00\", 
\"user\": \"Alice\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is 
a warning.\"}}",
+                                       "{\"rowtime\": \"2018-03-12 08:10:00\", 
\"user\": \"Alice\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is 
a warning.\"}}",
+                                       "{\"rowtime\": \"2018-03-12 09:00:00\", 
\"user\": \"Bob\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is 
another warning.\"}}",
+                                       "{\"rowtime\": \"2018-03-12 09:10:00\", 
\"user\": \"Alice\", \"event\": { \"type\": \"INFO\", \"message\": \"This is a 
info.\"}}",
+                                       "{\"rowtime\": \"2018-03-12 09:20:00\", 
\"user\": \"Steve\", \"event\": { \"type\": \"INFO\", \"message\": \"This is 
another info.\"}}",
+                                       "{\"rowtime\": \"2018-03-12 09:30:00\", 
\"user\": \"Steve\", \"event\": { \"type\": \"INFO\", \"message\": \"This is 
another info.\"}}",
+                                       "{\"rowtime\": \"2018-03-12 09:30:00\", 
\"user\": null, \"event\": { \"type\": \"WARNING\", \"message\": \"This is a 
bad message because the user is missing.\"}}",
+                                       "{\"rowtime\": \"2018-03-12 10:40:00\", 
\"user\": \"Bob\", \"event\": { \"type\": \"ERROR\", \"message\": \"This is an 
error.\"}}"
                        };
                        kafka.sendMessages(testJsonTopic, messages);
 
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java
index d464634..1ecdc73 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java
@@ -47,6 +47,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.formats.json.JsonFormatFactory.validateFormatOptions;
 import static org.apache.flink.formats.json.JsonOptions.FAIL_ON_MISSING_FIELD;
 import static org.apache.flink.formats.json.JsonOptions.IGNORE_PARSE_ERRORS;
+import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT;
 
 /**
  * Factory to build reader/writer to read/write json format file.
@@ -70,6 +71,7 @@ public class JsonFileSystemFormatFactory implements 
FileSystemFormatFactory {
                Set<ConfigOption<?>> options = new HashSet<>();
                options.add(FAIL_ON_MISSING_FIELD);
                options.add(IGNORE_PARSE_ERRORS);
+               options.add(TIMESTAMP_FORMAT);
                return options;
        }
 
@@ -79,13 +81,15 @@ public class JsonFileSystemFormatFactory implements 
FileSystemFormatFactory {
                validateFormatOptions(options);
                boolean failOnMissingField = options.get(FAIL_ON_MISSING_FIELD);
                boolean ignoreParseErrors = options.get(IGNORE_PARSE_ERRORS);
+               TimestampFormat timestampOption = 
JsonOptions.getTimestampFormat(options);
 
                RowType formatRowType = context.getFormatRowType();
                JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
                        formatRowType,
                        new GenericTypeInfo(GenericRowData.class),
                        failOnMissingField,
-                       ignoreParseErrors);
+                       ignoreParseErrors,
+                       timestampOption);
 
                String[] fieldNames = context.getSchema().getFieldNames();
                List<String> projectFields = 
Arrays.stream(context.getProjectFields())
@@ -117,7 +121,8 @@ public class JsonFileSystemFormatFactory implements 
FileSystemFormatFactory {
 
        @Override
        public Optional<Encoder<RowData>> createEncoder(WriterContext context) {
-               return Optional.of(new JsonRowDataEncoder(new 
JsonRowDataSerializationSchema(context.getFormatRowType())));
+               return Optional.of(new JsonRowDataEncoder(new 
JsonRowDataSerializationSchema(context.getFormatRowType(),
+                       
JsonOptions.getTimestampFormat(context.getFormatOptions()))));
        }
 
        @Override
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
index ba98867..57952b9 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
@@ -43,6 +43,8 @@ import java.util.Set;
 
 import static org.apache.flink.formats.json.JsonOptions.FAIL_ON_MISSING_FIELD;
 import static org.apache.flink.formats.json.JsonOptions.IGNORE_PARSE_ERRORS;
+import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT_ENUM;
 
 /**
  * Table format factory for providing configured instances of JSON to RowData
@@ -64,6 +66,7 @@ public class JsonFormatFactory implements
 
                final boolean failOnMissingField = 
formatOptions.get(FAIL_ON_MISSING_FIELD);
                final boolean ignoreParseErrors = 
formatOptions.get(IGNORE_PARSE_ERRORS);
+               TimestampFormat timestampOption = 
JsonOptions.getTimestampFormat(formatOptions);
 
                return new DecodingFormat<DeserializationSchema<RowData>>() {
                        @Override
@@ -77,7 +80,9 @@ public class JsonFormatFactory implements
                                                rowType,
                                                rowDataTypeInfo,
                                                failOnMissingField,
-                                               ignoreParseErrors);
+                                               ignoreParseErrors,
+                                               timestampOption
+                                       );
                        }
 
                        @Override
@@ -93,13 +98,15 @@ public class JsonFormatFactory implements
                        ReadableConfig formatOptions) {
                FactoryUtil.validateFactoryOptions(this, formatOptions);
 
+               TimestampFormat timestampOption = 
JsonOptions.getTimestampFormat(formatOptions);
+
                return new EncodingFormat<SerializationSchema<RowData>>() {
                        @Override
                        public SerializationSchema<RowData> 
createRuntimeEncoder(
                                        DynamicTableSink.Context context,
                                        DataType consumedDataType) {
                                final RowType rowType = (RowType) 
consumedDataType.getLogicalType();
-                               return new 
JsonRowDataSerializationSchema(rowType);
+                               return new 
JsonRowDataSerializationSchema(rowType, timestampOption);
                        }
 
                        @Override
@@ -124,6 +131,7 @@ public class JsonFormatFactory implements
                Set<ConfigOption<?>> options = new HashSet<>();
                options.add(FAIL_ON_MISSING_FIELD);
                options.add(IGNORE_PARSE_ERRORS);
+               options.add(TIMESTAMP_FORMAT);
                return options;
        }
 
@@ -134,11 +142,16 @@ public class JsonFormatFactory implements
        static void validateFormatOptions(ReadableConfig tableOptions) {
                boolean failOnMissingField = 
tableOptions.get(FAIL_ON_MISSING_FIELD);
                boolean ignoreParseErrors = 
tableOptions.get(IGNORE_PARSE_ERRORS);
+               String timestampFormat = tableOptions.get(TIMESTAMP_FORMAT);
                if (ignoreParseErrors && failOnMissingField) {
                        throw new 
ValidationException(FAIL_ON_MISSING_FIELD.key()
                                        + " and "
                                        + IGNORE_PARSE_ERRORS.key()
                                        + " shouldn't both be true.");
                }
+               if (!TIMESTAMP_FORMAT_ENUM.contains(timestampFormat)){
+                       throw new 
ValidationException(String.format("Unsupported value '%s' for %s. Supported 
values are [SQL, ISO-8601].",
+                               timestampFormat, TIMESTAMP_FORMAT.key()));
+               }
        }
 }
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
index dca8c16..6fc726b 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
@@ -20,6 +20,12 @@ package org.apache.flink.formats.json;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * This class holds configuration constants used by json format.
@@ -38,4 +44,41 @@ public class JsonOptions {
                        .defaultValue(false)
                        .withDescription("Optional flag to skip fields and rows 
with parse errors instead of failing;\n"
                                        + "fields are set to null in case of 
errors, false by default");
+
+       public static final ConfigOption<String> TIMESTAMP_FORMAT = 
ConfigOptions
+                       .key("timestamp-format.standard")
+                       .stringType()
+                       .defaultValue("SQL")
+                       .withDescription("Optional flag to specify timestamp 
format, SQL by default." +
+                               " Option ISO-8601 will parse input timestamp in 
\"yyyy-MM-ddTHH:mm:ss.s{precision}\" format and output timestamp in the same 
format." +
+                               " Option SQL will parse input timestamp in 
\"yyyy-MM-dd HH:mm:ss.s{precision}\" format and output timestamp in the same 
format.");
+
+       // 
--------------------------------------------------------------------------------------------
+       // Option enumerations
+       // 
--------------------------------------------------------------------------------------------
+
+       public static final String SQL = "SQL";
+       public static final String ISO_8601 = "ISO-8601";
+
+       public static final Set<String> TIMESTAMP_FORMAT_ENUM = new 
HashSet<>(Arrays.asList(
+               SQL,
+               ISO_8601
+       ));
+
+       // 
--------------------------------------------------------------------------------------------
+       // Utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       public static TimestampFormat getTimestampFormat(ReadableConfig config){
+               String timestampFormat = config.get(TIMESTAMP_FORMAT);
+               switch (timestampFormat){
+                       case SQL:
+                               return TimestampFormat.SQL;
+                       case ISO_8601:
+                               return TimestampFormat.ISO_8601;
+                       default:
+                               throw new TableException(
+                                       String.format("Unsupported timestamp 
format '%s'. Validator should have checked that.", timestampFormat));
+               }
+       }
 }
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index 87dee7f..d66ecce 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.json;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
@@ -50,7 +51,6 @@ import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
-import java.time.ZoneOffset;
 import java.time.temporal.TemporalAccessor;
 import java.time.temporal.TemporalQueries;
 import java.util.HashMap;
@@ -60,8 +60,9 @@ import java.util.Objects;
 
 import static java.lang.String.format;
 import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
-import static 
org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
-import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
+import static 
org.apache.flink.formats.json.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.SQL_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.SQL_TIME_FORMAT;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -93,11 +94,15 @@ public class JsonRowDataDeserializationSchema implements 
DeserializationSchema<R
        /** Object mapper for parsing the JSON. */
        private final ObjectMapper objectMapper = new ObjectMapper();
 
+       /** Timestamp format specification which is used to parse timestamp. */
+       private final TimestampFormat timestampFormat;
+
        public JsonRowDataDeserializationSchema(
                        RowType rowType,
                        TypeInformation<RowData> resultTypeInfo,
                        boolean failOnMissingField,
-                       boolean ignoreParseErrors) {
+                       boolean ignoreParseErrors,
+                       TimestampFormat timestampFormat) {
                if (ignoreParseErrors && failOnMissingField) {
                        throw new IllegalArgumentException(
                                "JSON format doesn't support failOnMissingField 
and ignoreParseErrors are both enabled.");
@@ -106,6 +111,7 @@ public class JsonRowDataDeserializationSchema implements 
DeserializationSchema<R
                this.failOnMissingField = failOnMissingField;
                this.ignoreParseErrors = ignoreParseErrors;
                this.runtimeConverter = 
createRowConverter(checkNotNull(rowType));
+               this.timestampFormat = timestampFormat;
        }
 
        @Override
@@ -142,12 +148,13 @@ public class JsonRowDataDeserializationSchema implements 
DeserializationSchema<R
                JsonRowDataDeserializationSchema that = 
(JsonRowDataDeserializationSchema) o;
                return failOnMissingField == that.failOnMissingField &&
                                ignoreParseErrors == that.ignoreParseErrors &&
-                               resultTypeInfo.equals(that.resultTypeInfo);
+                               resultTypeInfo.equals(that.resultTypeInfo) &&
+                               timestampFormat.equals(that.timestampFormat);
        }
 
        @Override
        public int hashCode() {
-               return Objects.hash(failOnMissingField, ignoreParseErrors, 
resultTypeInfo);
+               return Objects.hash(failOnMissingField, ignoreParseErrors, 
resultTypeInfo, timestampFormat);
        }
 
        // 
-------------------------------------------------------------------------------------
@@ -193,7 +200,6 @@ public class JsonRowDataDeserializationSchema implements 
DeserializationSchema<R
                                return this::convertToDate;
                        case TIME_WITHOUT_TIME_ZONE:
                                return this::convertToTime;
-                       case TIMESTAMP_WITH_TIME_ZONE:
                        case TIMESTAMP_WITHOUT_TIME_ZONE:
                                return this::convertToTimestamp;
                        case FLOAT:
@@ -272,37 +278,25 @@ public class JsonRowDataDeserializationSchema implements 
DeserializationSchema<R
        }
 
        private int convertToTime(JsonNode jsonNode) {
-               // according to RFC 3339 every full-time must have a timezone;
-               // until we have full timezone support, we only support UTC;
-               // users can parse their time as string as a workaround
-               TemporalAccessor parsedTime = 
RFC3339_TIME_FORMAT.parse(jsonNode.asText());
-
-               ZoneOffset zoneOffset = 
parsedTime.query(TemporalQueries.offset());
+               TemporalAccessor parsedTime = 
SQL_TIME_FORMAT.parse(jsonNode.asText());
                LocalTime localTime = 
parsedTime.query(TemporalQueries.localTime());
 
-               if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0 || 
localTime.getNano() != 0) {
-                       throw new JsonParseException(
-                               "Invalid time format. Only a time in UTC 
timezone without milliseconds is supported yet.");
-               }
-
                // get number of milliseconds of the day
                return localTime.toSecondOfDay() * 1000;
        }
 
        private TimestampData convertToTimestamp(JsonNode jsonNode) {
-               // according to RFC 3339 every date-time must have a timezone;
-               // until we have full timezone support, we only support UTC;
-               // users can parse their time as string as a workaround
-               TemporalAccessor parsedTimestamp = 
RFC3339_TIMESTAMP_FORMAT.parse(jsonNode.asText());
-
-               ZoneOffset zoneOffset = 
parsedTimestamp.query(TemporalQueries.offset());
-
-               if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0) {
-                       throw new JsonParseException(
-                               "Invalid timestamp format. Only a timestamp in 
UTC timezone is supported yet. " +
-                                       "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+               TemporalAccessor parsedTimestamp;
+               switch (timestampFormat){
+                       case SQL:
+                               parsedTimestamp = 
SQL_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+                               break;
+                       case ISO_8601:
+                               parsedTimestamp = 
ISO8601_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+                               break;
+                       default:
+                               throw new 
TableException(String.format("Unsupported timestamp format '%s'. Validator 
should have checked that.", timestampFormat));
                }
-
                LocalTime localTime = 
parsedTimestamp.query(TemporalQueries.localTime());
                LocalDate localDate = 
parsedTimestamp.query(TemporalQueries.localDate());
 
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index 263e282..89b3b87 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -20,6 +20,7 @@ package org.apache.flink.formats.json;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.MapData;
@@ -45,8 +46,9 @@ import java.util.Arrays;
 import java.util.Objects;
 
 import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
-import static 
org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
-import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
+import static 
org.apache.flink.formats.json.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.SQL_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.SQL_TIME_FORMAT;
 
 /**
  * Serialization schema that serializes an object of Flink internal data 
structure into a JSON bytes.
@@ -72,8 +74,12 @@ public class JsonRowDataSerializationSchema implements 
SerializationSchema<RowDa
        /** Reusable object node. */
        private transient ObjectNode node;
 
-       public JsonRowDataSerializationSchema(RowType rowType) {
+       /** Timestamp format specification which is used to parse timestamp. */
+       private final TimestampFormat timestampFormat;
+
+       public JsonRowDataSerializationSchema(RowType rowType, TimestampFormat 
timestampFormat) {
                this.rowType = rowType;
+               this.timestampFormat = timestampFormat;
                this.runtimeConverter = createConverter(rowType);
        }
 
@@ -101,12 +107,12 @@ public class JsonRowDataSerializationSchema implements 
SerializationSchema<RowDa
                        return false;
                }
                JsonRowDataSerializationSchema that = 
(JsonRowDataSerializationSchema) o;
-               return rowType.equals(that.rowType);
+               return rowType.equals(that.rowType) && 
timestampFormat.equals(timestampFormat);
        }
 
        @Override
        public int hashCode() {
-               return Objects.hash(rowType);
+               return Objects.hash(rowType, timestampFormat);
        }
 
        // 
--------------------------------------------------------------------------------
@@ -162,7 +168,6 @@ public class JsonRowDataSerializationSchema implements 
SerializationSchema<RowDa
                                return createDateConverter();
                        case TIME_WITHOUT_TIME_ZONE:
                                return createTimeConverter();
-                       case TIMESTAMP_WITH_TIME_ZONE:
                        case TIMESTAMP_WITHOUT_TIME_ZONE:
                                return createTimestampConverter();
                        case DECIMAL:
@@ -199,16 +204,27 @@ public class JsonRowDataSerializationSchema implements 
SerializationSchema<RowDa
                return (mapper, reuse, value) -> {
                        int millisecond = (int) value;
                        LocalTime time = LocalTime.ofSecondOfDay(millisecond / 
1000L);
-                       return 
mapper.getNodeFactory().textNode(RFC3339_TIME_FORMAT.format(time));
+                       return 
mapper.getNodeFactory().textNode(SQL_TIME_FORMAT.format(time));
                };
        }
 
        private SerializationRuntimeConverter createTimestampConverter() {
-               return (mapper, reuse, value) -> {
-                       TimestampData timestamp = (TimestampData) value;
-                       return mapper.getNodeFactory()
-                               
.textNode(RFC3339_TIMESTAMP_FORMAT.format(timestamp.toLocalDateTime()));
-               };
+               switch (timestampFormat){
+                       case ISO_8601:
+                               return (mapper, reuse, value) -> {
+                                       TimestampData timestamp = 
(TimestampData) value;
+                                       return mapper.getNodeFactory()
+                                               
.textNode(ISO8601_TIMESTAMP_FORMAT.format(timestamp.toLocalDateTime()));
+                               };
+                       case SQL:
+                               return (mapper, reuse, value) -> {
+                                       TimestampData timestamp = 
(TimestampData) value;
+                                       return mapper.getNodeFactory()
+                                               
.textNode(SQL_TIMESTAMP_FORMAT.format(timestamp.toLocalDateTime()));
+                               };
+                       default:
+                               throw new TableException("Unsupported timestamp 
format. Validator should have checked that.");
+               }
        }
 
        private SerializationRuntimeConverter createArrayConverter(ArrayType 
type) {
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java
index c946c5d..693781b 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java
@@ -23,7 +23,7 @@ import java.time.format.DateTimeFormatterBuilder;
 import java.time.temporal.ChronoField;
 
 /**
- * Time formats respecting the RFC3339 specification.
+ * Time formats and timestamp formats respecting the RFC3339 specification, 
ISO-8601 specification and SQL specification.
  */
 class TimeFormats {
 
@@ -41,6 +41,22 @@ class TimeFormats {
                .append(RFC3339_TIME_FORMAT)
                .toFormatter();
 
+       /** Formatter for ISO8601 string representation of a timestamp value 
(without UTC timezone). */
+       static final DateTimeFormatter ISO8601_TIMESTAMP_FORMAT = 
DateTimeFormatter.ISO_LOCAL_DATE_TIME;
+
+       /** Formatter for SQL string representation of a time value. */
+       static final DateTimeFormatter SQL_TIME_FORMAT = new 
DateTimeFormatterBuilder()
+               .appendPattern("HH:mm:ss")
+               .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
+               .toFormatter();
+
+       /** Formatter for SQL string representation of a timestamp value 
(without UTC timezone). */
+       static final DateTimeFormatter SQL_TIMESTAMP_FORMAT = new 
DateTimeFormatterBuilder()
+               .append(DateTimeFormatter.ISO_LOCAL_DATE)
+               .appendLiteral(' ')
+               .append(SQL_TIME_FORMAT)
+               .toFormatter();
+
        private TimeFormats() {
        }
 }
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimestampFormat.java
similarity index 52%
copy from 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
copy to 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimestampFormat.java
index dca8c16..0c31fd4 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimestampFormat.java
@@ -18,24 +18,18 @@
 
 package org.apache.flink.formats.json;
 
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.annotation.Internal;
 
 /**
- * This class holds configuration constants used by json format.
+ * Timestamp format Enums.
  */
-public class JsonOptions {
+@Internal
+public enum TimestampFormat {
+       /** Options to specify timestamp format. It will parse timestamp in 
"yyyy-MM-dd HH:mm:ss.s{precision}" format
+        * and output timestamp in the same format*/
+       SQL,
 
-       public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = 
ConfigOptions
-                       .key("fail-on-missing-field")
-                       .booleanType()
-                       .defaultValue(false)
-                       .withDescription("Optional flag to specify whether to 
fail if a field is missing or not, false by default");
-
-       public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = 
ConfigOptions
-                       .key("ignore-parse-errors")
-                       .booleanType()
-                       .defaultValue(false)
-                       .withDescription("Optional flag to skip fields and rows 
with parse errors instead of failing;\n"
-                                       + "fields are set to null in case of 
errors, false by default");
+       /** Options to specify timestamp format. It will parse timestamp in 
"yyyy-MM-ddTHH:mm:ss.s{precision}" format
+        * and output timestamp in the same format*/
+       ISO_8601
 }
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
index 4c03d04..761a0f4 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.json.canal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.GenericRowData;
@@ -70,7 +71,8 @@ public final class CanalJsonDeserializationSchema implements 
DeserializationSche
        public CanalJsonDeserializationSchema(
                        RowType rowType,
                        TypeInformation<RowData> resultTypeInfo,
-                       boolean ignoreParseErrors) {
+                       boolean ignoreParseErrors,
+                       TimestampFormat timestampFormatOption) {
                this.resultTypeInfo = resultTypeInfo;
                this.ignoreParseErrors = ignoreParseErrors;
                this.fieldCount = rowType.getFieldCount();
@@ -79,7 +81,8 @@ public final class CanalJsonDeserializationSchema implements 
DeserializationSche
                        // the result type is never used, so it's fine to pass 
in Canal's result type
                        resultTypeInfo,
                        false, // ignoreParseErrors already contains the 
functionality of failOnMissingField
-                       ignoreParseErrors);
+                       ignoreParseErrors,
+                       timestampFormatOption);
 
        }
 
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
index fba98bf..cedff80 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
@@ -22,8 +22,9 @@ import 
org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.json.JsonOptions;
+import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
@@ -48,12 +49,9 @@ public class CanalJsonFormatFactory implements 
DeserializationFormatFactory, Ser
 
        public static final String IDENTIFIER = "canal-json";
 
-       public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = 
ConfigOptions
-               .key("ignore-parse-errors")
-               .booleanType()
-               .defaultValue(false)
-               .withDescription("Optional flag to skip fields and rows with 
parse errors instead of failing, " +
-                       "fields are set to null in case of errors. Default is 
false.");
+       public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = 
JsonOptions.IGNORE_PARSE_ERRORS;
+
+       public static final ConfigOption<String> TIMESTAMP_FORMAT = 
JsonOptions.TIMESTAMP_FORMAT;
 
        @SuppressWarnings("unchecked")
        @Override
@@ -62,6 +60,7 @@ public class CanalJsonFormatFactory implements 
DeserializationFormatFactory, Ser
                        ReadableConfig formatOptions) {
                FactoryUtil.validateFactoryOptions(this, formatOptions);
                final boolean ignoreParseErrors = 
formatOptions.get(IGNORE_PARSE_ERRORS);
+               TimestampFormat timestampFormatOption = 
JsonOptions.getTimestampFormat(formatOptions);
 
                return new DecodingFormat<DeserializationSchema<RowData>>() {
                        @Override
@@ -73,7 +72,8 @@ public class CanalJsonFormatFactory implements 
DeserializationFormatFactory, Ser
                                return new CanalJsonDeserializationSchema(
                                        rowType,
                                        rowDataTypeInfo,
-                                       ignoreParseErrors);
+                                       ignoreParseErrors,
+                                       timestampFormatOption);
                        }
 
                        @Override
@@ -109,6 +109,7 @@ public class CanalJsonFormatFactory implements 
DeserializationFormatFactory, Ser
        public Set<ConfigOption<?>> optionalOptions() {
                Set<ConfigOption<?>> options = new HashSet<>();
                options.add(IGNORE_PARSE_ERRORS);
+               options.add(TIMESTAMP_FORMAT);
                return options;
        }
 
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
index f0ae9d9..d1f1d96 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -78,7 +79,8 @@ public final class DebeziumJsonDeserializationSchema 
implements DeserializationS
                        RowType rowType,
                        TypeInformation<RowData> resultTypeInfo,
                        boolean schemaInclude,
-                       boolean ignoreParseErrors) {
+                       boolean ignoreParseErrors,
+                       TimestampFormat timestampFormatOption) {
                this.resultTypeInfo = resultTypeInfo;
                this.schemaInclude = schemaInclude;
                this.ignoreParseErrors = ignoreParseErrors;
@@ -87,7 +89,8 @@ public final class DebeziumJsonDeserializationSchema 
implements DeserializationS
                        // the result type is never used, so it's fine to pass 
in Debezium's result type
                        resultTypeInfo,
                        false, // ignoreParseErrors already contains the 
functionality of failOnMissingField
-                       ignoreParseErrors);
+                       ignoreParseErrors,
+                       timestampFormatOption);
        }
 
        @Override
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
index 3458014..17c6ab2 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.json.JsonOptions;
+import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
@@ -57,12 +59,9 @@ public class DebeziumJsonFormatFactory implements 
DeserializationFormatFactory,
                        "This option indicates the Debezium JSON data include 
the schema in the message or not. " +
                        "Default is false.");
 
-       public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = 
ConfigOptions
-               .key("ignore-parse-errors")
-               .booleanType()
-               .defaultValue(false)
-               .withDescription("Optional flag to skip fields and rows with 
parse errors instead of failing, " +
-                       "fields are set to null in case of errors. Default is 
false.");
+       public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = 
JsonOptions.IGNORE_PARSE_ERRORS;
+
+       public static final ConfigOption<String> TIMESTAMP_FORMAT = 
JsonOptions.TIMESTAMP_FORMAT;
 
        @SuppressWarnings("unchecked")
        @Override
@@ -72,6 +71,7 @@ public class DebeziumJsonFormatFactory implements 
DeserializationFormatFactory,
                FactoryUtil.validateFactoryOptions(this, formatOptions);
                final boolean schemaInclude = formatOptions.get(SCHEMA_INCLUDE);
                final boolean ignoreParseErrors = 
formatOptions.get(IGNORE_PARSE_ERRORS);
+               TimestampFormat timestampFormatOption = 
JsonOptions.getTimestampFormat(formatOptions);
 
                return new DecodingFormat<DeserializationSchema<RowData>>() {
                        @Override
@@ -84,7 +84,8 @@ public class DebeziumJsonFormatFactory implements 
DeserializationFormatFactory,
                                        rowType,
                                        rowDataTypeInfo,
                                        schemaInclude,
-                                       ignoreParseErrors);
+                                       ignoreParseErrors,
+                                       timestampFormatOption);
                        }
 
                        @Override
@@ -121,6 +122,7 @@ public class DebeziumJsonFormatFactory implements 
DeserializationFormatFactory,
                Set<ConfigOption<?>> options = new HashSet<>();
                options.add(SCHEMA_INCLUDE);
                options.add(IGNORE_PARSE_ERRORS);
+               options.add(TIMESTAMP_FORMAT);
                return options;
        }
 
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
index c12eecf..568563e 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
@@ -91,6 +91,25 @@ public class JsonFormatFactoryTest extends TestLogger {
                testSchemaDeserializationSchema(tableOptions);
        }
 
+       @Test
+       public void testInvalidOptionForTimestampFormat() {
+               final Map<String, String> tableOptions = getModifyOptions(
+                       options -> 
options.put("json.timestamp-format.standard", "test"));
+
+               thrown.expect(ValidationException.class);
+               thrown.expect(containsCause(new 
ValidationException("Unsupported value 'test' for timestamp-format.standard. 
Supported values are [SQL, ISO-8601].")));
+               testSchemaDeserializationSchema(tableOptions);
+       }
+
+       @Test
+       public void testLowerCaseOptionForTimestampFormat() {
+               final Map<String, String> tableOptions = getModifyOptions(
+                       options -> 
options.put("json.timestamp-format.standard", "iso-8601"));
+
+               thrown.expect(ValidationException.class);
+               thrown.expect(containsCause(new 
ValidationException("Unsupported value 'iso-8601' for 
timestamp-format.standard. Supported values are [SQL, ISO-8601].")));
+               testSchemaDeserializationSchema(tableOptions);
+       }
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
@@ -101,7 +120,8 @@ public class JsonFormatFactoryTest extends TestLogger {
                                                ROW_TYPE,
                                                new RowDataTypeInfo(ROW_TYPE),
                                                false,
-                                               true);
+                                               true,
+                                               TimestampFormat.ISO_8601);
 
                final DynamicTableSource actualSource = 
createTableSource(options);
                assert actualSource instanceof 
TestDynamicTableFactory.DynamicTableSourceMock;
@@ -117,7 +137,8 @@ public class JsonFormatFactoryTest extends TestLogger {
        }
 
        private void testSchemaSerializationSchema(Map<String, String> options) 
{
-               final JsonRowDataSerializationSchema expectedSer = new 
JsonRowDataSerializationSchema(ROW_TYPE);
+               final JsonRowDataSerializationSchema expectedSer = new 
JsonRowDataSerializationSchema(ROW_TYPE,
+                       TimestampFormat.ISO_8601);
 
                final DynamicTableSink actualSink = createTableSink(options);
                assert actualSink instanceof 
TestDynamicTableFactory.DynamicTableSinkMock;
@@ -152,6 +173,7 @@ public class JsonFormatFactoryTest extends TestLogger {
                options.put("format", JsonFormatFactory.IDENTIFIER);
                options.put("json.fail-on-missing-field", "false");
                options.put("json.ignore-parse-errors", "true");
+               options.put("json.timestamp-format.standard", "ISO-8601");
                return options;
        }
 
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index dcfae9c..fedfbea 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -114,9 +114,9 @@ public class JsonRowDataSerDeSchemaTest {
                root.put("decimal", decimal);
                root.set("doubles", doubleNode);
                root.put("date", "1990-10-14");
-               root.put("time", "12:12:43Z");
-               root.put("timestamp3", "1990-10-14T12:12:43.123Z");
-               root.put("timestamp9", "1990-10-14T12:12:43.123456789Z");
+               root.put("time", "12:12:43");
+               root.put("timestamp3", "1990-10-14T12:12:43.123");
+               root.put("timestamp9", "1990-10-14T12:12:43.123456789");
                root.putObject("map").put("flink", 123);
                root.putObject("map2map").putObject("inner_map").put("key", 
234);
 
@@ -143,7 +143,7 @@ public class JsonRowDataSerDeSchemaTest {
                RowDataTypeInfo resultTypeInfo = new RowDataTypeInfo(schema);
 
                JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
-                       schema, resultTypeInfo, false, false);
+                       schema, resultTypeInfo, false, false, 
TimestampFormat.ISO_8601);
 
                Row expected = new Row(16);
                expected.setField(0, true);
@@ -168,7 +168,7 @@ public class JsonRowDataSerDeSchemaTest {
                assertEquals(expected, actual);
 
                // test serialization
-               JsonRowDataSerializationSchema serializationSchema = new 
JsonRowDataSerializationSchema(schema);
+               JsonRowDataSerializationSchema serializationSchema = new 
JsonRowDataSerializationSchema(schema,  TimestampFormat.ISO_8601);
 
                byte[] actualBytes = serializationSchema.serialize(rowData);
                assertEquals(new String(serializedJson), new 
String(actualBytes));
@@ -211,7 +211,7 @@ public class JsonRowDataSerDeSchemaTest {
                RowType rowType = (RowType) dataType.getLogicalType();
 
                JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
-                       rowType, new RowDataTypeInfo(rowType), false, false);
+                       rowType, new RowDataTypeInfo(rowType), false, false,  
TimestampFormat.ISO_8601);
 
                Row expected = new Row(7);
                expected.setField(0, bool);
@@ -236,8 +236,8 @@ public class JsonRowDataSerDeSchemaTest {
                ).getLogicalType();
 
                JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
-                       rowType, new RowDataTypeInfo(rowType), false, false);
-               JsonRowDataSerializationSchema serializationSchema = new 
JsonRowDataSerializationSchema(rowType);
+                       rowType, new RowDataTypeInfo(rowType), false, false,  
TimestampFormat.ISO_8601);
+               JsonRowDataSerializationSchema serializationSchema = new 
JsonRowDataSerializationSchema(rowType, TimestampFormat.ISO_8601);
 
                ObjectMapper objectMapper = new ObjectMapper();
 
@@ -290,8 +290,8 @@ public class JsonRowDataSerDeSchemaTest {
                ).getLogicalType();
 
                JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
-                       rowType, new RowDataTypeInfo(rowType), false, true);
-               JsonRowDataSerializationSchema serializationSchema = new 
JsonRowDataSerializationSchema(rowType);
+                       rowType, new RowDataTypeInfo(rowType), false, true, 
TimestampFormat.ISO_8601);
+               JsonRowDataSerializationSchema serializationSchema = new 
JsonRowDataSerializationSchema(rowType, TimestampFormat.ISO_8601);
 
                for (int i = 0; i < jsons.length; i++) {
                        String json = jsons[i];
@@ -315,7 +315,7 @@ public class JsonRowDataSerDeSchemaTest {
 
                // pass on missing field
                JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
-                       schema, new RowDataTypeInfo(schema), false, false);
+                       schema, new RowDataTypeInfo(schema), false, false, 
TimestampFormat.ISO_8601);
 
                Row expected = new Row(1);
                Row actual = 
convertToExternal(deserializationSchema.deserialize(serializedJson), dataType);
@@ -323,7 +323,7 @@ public class JsonRowDataSerDeSchemaTest {
 
                // fail on missing field
                deserializationSchema = deserializationSchema = new 
JsonRowDataDeserializationSchema(
-                       schema, new RowDataTypeInfo(schema), true, false);
+                       schema, new RowDataTypeInfo(schema), true, false, 
TimestampFormat.ISO_8601);
 
                thrown.expect(IOException.class);
                thrown.expectMessage("Failed to deserialize JSON 
'{\"id\":123123123}'");
@@ -331,7 +331,7 @@ public class JsonRowDataSerDeSchemaTest {
 
                // ignore on parse error
                deserializationSchema = new JsonRowDataDeserializationSchema(
-                       schema, new RowDataTypeInfo(schema), false, true);
+                       schema, new RowDataTypeInfo(schema), false, true, 
TimestampFormat.ISO_8601);
                actual = 
convertToExternal(deserializationSchema.deserialize(serializedJson), dataType);
                assertEquals(expected, actual);
 
@@ -340,7 +340,29 @@ public class JsonRowDataSerDeSchemaTest {
                // failOnMissingField and ignoreParseErrors both enabled
                //noinspection ConstantConditions
                new JsonRowDataDeserializationSchema(
-                       schema, new RowDataTypeInfo(schema), true, true);
+                       schema, new RowDataTypeInfo(schema), true, true, 
TimestampFormat.ISO_8601);
+       }
+
+       @Test
+       public void testSerDeSQLTimestampFormat() throws Exception{
+               RowType rowType = (RowType) ROW(
+                       FIELD("timestamp3", TIMESTAMP(3)),
+                       FIELD("timestamp9", TIMESTAMP(9))
+               ).getLogicalType();
+
+               JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
+                       rowType, new RowDataTypeInfo(rowType), false, false, 
TimestampFormat.SQL);
+               JsonRowDataSerializationSchema serializationSchema = new 
JsonRowDataSerializationSchema(rowType, TimestampFormat.SQL);
+
+               ObjectMapper objectMapper = new ObjectMapper();
+
+               ObjectNode root = objectMapper.createObjectNode();
+               root.put("timestamp3", "1990-10-14 12:12:43.123");
+               root.put("timestamp9", "1990-10-14 12:12:43.123456789");
+               byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+               RowData rowData = 
deserializationSchema.deserialize(serializedJson);
+               byte[] actual = serializationSchema.serialize(rowData);
+               assertEquals(new String(serializedJson), new String(actual));
        }
 
        @Test
@@ -356,7 +378,8 @@ public class JsonRowDataSerDeSchemaTest {
        private void testIgnoreParseErrors(TestSpec spec) throws Exception {
                // the parsing field should be null and no exception is thrown
                JsonRowDataDeserializationSchema ignoreErrorsSchema = new 
JsonRowDataDeserializationSchema(
-                       spec.rowType,  new RowDataTypeInfo(spec.rowType), 
false, true);
+                       spec.rowType,  new RowDataTypeInfo(spec.rowType), 
false, true,
+                       TimestampFormat.ISO_8601);
                Row expected;
                if (spec.expected != null) {
                        expected = spec.expected;
@@ -373,7 +396,8 @@ public class JsonRowDataSerDeSchemaTest {
        private void testParseErrors(TestSpec spec) throws Exception {
                // expect exception if parse error is not ignored
                JsonRowDataDeserializationSchema failingSchema = new 
JsonRowDataDeserializationSchema(
-                       spec.rowType,  new RowDataTypeInfo(spec.rowType), 
false, false);
+                       spec.rowType,  new RowDataTypeInfo(spec.rowType), 
false, false,
+                       spec.timestampFormat);
 
                thrown.expectMessage(spec.errorMessage);
                failingSchema.deserialize(spec.json.getBytes());
@@ -421,6 +445,16 @@ public class JsonRowDataSerDeSchemaTest {
                        .expectErrorMessage("Failed to deserialize JSON 
'{\"id\":\"18:00:243\"}'"),
 
                TestSpec
+                       .json("{\"id\":\"18:00:243\"}")
+                       .rowType(ROW(FIELD("id", TIME())))
+                       .expectErrorMessage("Failed to deserialize JSON 
'{\"id\":\"18:00:243\"}'"),
+
+               TestSpec
+                       .json("{\"id\":\"20191112\"}")
+                       .rowType(ROW(FIELD("id", DATE())))
+                       .expectErrorMessage("Failed to deserialize JSON 
'{\"id\":\"20191112\"}'"),
+
+               TestSpec
                        .json("{\"id\":\"20191112\"}")
                        .rowType(ROW(FIELD("id", DATE())))
                        .expectErrorMessage("Failed to deserialize JSON 
'{\"id\":\"20191112\"}'"),
@@ -428,9 +462,26 @@ public class JsonRowDataSerDeSchemaTest {
                TestSpec
                        .json("{\"id\":\"2019-11-12 18:00:12\"}")
                        .rowType(ROW(FIELD("id", TIMESTAMP(0))))
+                       .timestampFormat(TimestampFormat.ISO_8601)
                        .expectErrorMessage("Failed to deserialize JSON 
'{\"id\":\"2019-11-12 18:00:12\"}'"),
 
                TestSpec
+                       .json("{\"id\":\"2019-11-12T18:00:12\"}")
+                       .rowType(ROW(FIELD("id", TIMESTAMP(0))))
+                       .expectErrorMessage("Failed to deserialize JSON 
'{\"id\":\"2019-11-12T18:00:12\"}'"),
+
+               TestSpec
+                       .json("{\"id\":\"2019-11-12T18:00:12Z\"}")
+                       .rowType(ROW(FIELD("id", TIMESTAMP(0))))
+                       .expectErrorMessage("Failed to deserialize JSON 
'{\"id\":\"2019-11-12T18:00:12Z\"}'"),
+
+               TestSpec
+                       .json("{\"id\":\"2019-11-12T18:00:12Z\"}")
+                       .rowType(ROW(FIELD("id", TIMESTAMP(0))))
+                       .timestampFormat(TimestampFormat.ISO_8601)
+                       .expectErrorMessage("Failed to deserialize JSON 
'{\"id\":\"2019-11-12T18:00:12Z\"}'"),
+
+               TestSpec
                        .json("{\"id\":\"abc\"}")
                        .rowType(ROW(FIELD("id", DECIMAL(10, 3))))
                        .expectErrorMessage("Failed to deserialize JSON 
'{\"id\":\"abc\"}'"),
@@ -471,6 +522,7 @@ public class JsonRowDataSerDeSchemaTest {
        private static class TestSpec {
                private final String json;
                private RowType rowType;
+               private TimestampFormat timestampFormat = TimestampFormat.SQL;
                private Row expected;
                private String errorMessage;
 
@@ -496,5 +548,10 @@ public class JsonRowDataSerDeSchemaTest {
                        this.errorMessage = errorMessage;
                        return this;
                }
+
+               TestSpec timestampFormat(TimestampFormat timestampFormat){
+                       this.timestampFormat = timestampFormat;
+                       return this;
+               }
        }
 }
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchemaTest.java
index 02f055e..36099ef 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchemaTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.formats.json.canal;
 
+import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
@@ -65,7 +66,8 @@ public class CanalJsonDeserializationSchemaTest {
                CanalJsonDeserializationSchema deserializationSchema = new 
CanalJsonDeserializationSchema(
                        SCHEMA,
                        new RowDataTypeInfo(SCHEMA),
-                       false);
+                       false,
+                       TimestampFormat.ISO_8601);
 
                SimpleCollector collector = new SimpleCollector();
                for (String line : lines) {
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
index 9d64079..102ba88 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.formats.json.canal;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTableImpl;
@@ -65,7 +66,8 @@ public class CanalJsonFormatFactoryTest extends TestLogger {
                final CanalJsonDeserializationSchema expectedDeser = new 
CanalJsonDeserializationSchema(
                        ROW_TYPE,
                        new RowDataTypeInfo(ROW_TYPE),
-                       true);
+                       true,
+                       TimestampFormat.ISO_8601);
 
                final Map<String, String> options = getAllOptions();
 
@@ -120,6 +122,7 @@ public class CanalJsonFormatFactoryTest extends TestLogger {
 
                options.put("format", "canal-json");
                options.put("canal-json.ignore-parse-errors", "true");
+               options.put("canal-json.timestamp-format.standard", "ISO-8601");
                return options;
        }
 
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchemaTest.java
index ff7a343..d8114a8 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchemaTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.formats.json.debezium;
 
+import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
@@ -75,7 +76,8 @@ public class DebeziumJsonDeserializationSchemaTest {
                        SCHEMA,
                        new RowDataTypeInfo(SCHEMA),
                        schemaInclude,
-                       false);
+                       false,
+                       TimestampFormat.ISO_8601);
 
                SimpleCollector collector = new SimpleCollector();
                for (String line : lines) {
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
index 59dd3dd..e19581e 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.formats.json.debezium;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTableImpl;
@@ -66,7 +67,8 @@ public class DebeziumJsonFormatFactoryTest extends TestLogger 
{
                        ROW_TYPE,
                        new RowDataTypeInfo(ROW_TYPE),
                        true,
-                       true);
+                       true,
+                       TimestampFormat.ISO_8601);
 
                final Map<String, String> options = getAllOptions();
 
@@ -122,6 +124,7 @@ public class DebeziumJsonFormatFactoryTest extends 
TestLogger {
                options.put("format", "debezium-json");
                options.put("debezium-json.ignore-parse-errors", "true");
                options.put("debezium-json.schema-include", "true");
+               options.put("debezium-json.timestamp-format.standard", 
"ISO-8601");
                return options;
        }
 

Reply via email to