wuchong commented on a change in pull request #12756:
URL: https://github.com/apache/flink/pull/12756#discussion_r450747455



##########
File path: docs/dev/table/connectors/formats/json.md
##########
@@ -108,10 +108,12 @@ Format Options
       <td>optional</td>
       <td style="word-wrap: break-word;"><code>'SQL'</code></td>
       <td>String</td>
-      <td>Specify the input and output timestamp format. Currently supported 
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+      <td>Specify the input and output timestamp/timestamp with local time 
zone format. Currently supported values are <code>'SQL'</code> and 
<code>'ISO-8601'</code>:
       <ul>
-        <li>Option <code>'SQL'</code> will parse input timestamp in 
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and 
output timestamp in the same format.</li>
-        <li>Option <code>'ISO-8601'</code>will parse input timestamp in 
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and 
output timestamp in the same format.</li>
+        <li>Option <code>'SQL'</code> will parse input timestamp in 
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g "2020-12-30 12:13:14.123", 
+        parse input timestamp with local time zone in "yyyy-MM-dd 
HH:mm:ss.s{precision}'Z'" format, e.g "2020-12-30 12:13:14.123Z" and output 
timestamp in the same format.</li>

Review comment:
       ```suggestion
           parse input TIMESTAMP WITH LOCAL TIME ZONE values in "yyyy-MM-dd 
HH:mm:ss.s{precision}'Z'" format, e.g "2020-12-30 12:13:14.123Z" and output 
timestamp in the same format.</li>
   ```

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
##########
@@ -444,10 +444,9 @@ private Date convertToDate(ObjectMapper mapper, JsonNode 
jsonNode) {
        }
 
        private LocalDateTime convertToLocalDateTime(ObjectMapper mapper, 
JsonNode jsonNode) {
-               // according to RFC 3339 every date-time must have a timezone;
-               // until we have full timezone support, we only support UTC;
-               // users can parse their time as string as a workaround
-               TemporalAccessor parsedTimestamp = 
RFC3339_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+               // The input of Time can't contain any zone info
+               // Users have to cast time with zone into local time manually.
+               TemporalAccessor parsedTimestamp = 
ISO8601_TIMESTAMP_FORMAT.parse(jsonNode.asText());

Review comment:
       We don't need to udpate legacy json format (keep comatibility), we also 
didn't change the behavior when we support timestamp format for TIMESTAMP type, 
right?

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
##########
@@ -303,6 +308,24 @@ private TimestampData convertToTimestamp(JsonNode 
jsonNode) {
                return 
TimestampData.fromLocalDateTime(LocalDateTime.of(localDate, localTime));
        }
 
+       private TimestampData convertToTimestampWithLocalZone(JsonNode 
jsonNode){
+               TemporalAccessor parsedTimestampWithLocalZone;
+               switch (timestampFormat){
+                       case SQL:
+                               parsedTimestampWithLocalZone = 
SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(jsonNode.asText());
+                               break;
+                       case ISO_8601:
+                               parsedTimestampWithLocalZone = 
ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(jsonNode.asText());
+                               break;
+                       default:
+                               throw new 
TableException(String.format("Unsupported timestamp format '%s'. Validator 
should have checked that.", timestampFormat));
+               }
+               LocalTime localTime = 
parsedTimestampWithLocalZone.query(TemporalQueries.localTime());
+               LocalDate localDate = 
parsedTimestampWithLocalZone.query(TemporalQueries.localDate());
+
+               return TimestampData.fromInstant(LocalDateTime.of(localDate, 
localTime).atOffset(ZoneOffset.UTC).toInstant());

Review comment:
       `return TimestampData.fromInstant(LocalDateTime.of(localDate, 
localTime).toInstant(ZoneOffset.UTC));`

##########
File path: docs/dev/table/connectors/formats/json.md
##########
@@ -108,10 +108,12 @@ Format Options
       <td>optional</td>
       <td style="word-wrap: break-word;"><code>'SQL'</code></td>
       <td>String</td>
-      <td>Specify the input and output timestamp format. Currently supported 
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+      <td>Specify the input and output timestamp/timestamp with local time 
zone format. Currently supported values are <code>'SQL'</code> and 
<code>'ISO-8601'</code>:
       <ul>
-        <li>Option <code>'SQL'</code> will parse input timestamp in 
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and 
output timestamp in the same format.</li>
-        <li>Option <code>'ISO-8601'</code>will parse input timestamp in 
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and 
output timestamp in the same format.</li>
+        <li>Option <code>'SQL'</code> will parse input timestamp in 
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g "2020-12-30 12:13:14.123", 

Review comment:
       ```suggestion
           <li>Option <code>'SQL'</code> will parse input TIMESTAMP values in 
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g "2020-12-30 12:13:14.123", 
   ```

##########
File path: docs/dev/table/connectors/formats/json.md
##########
@@ -108,10 +108,12 @@ Format Options
       <td>optional</td>
       <td style="word-wrap: break-word;"><code>'SQL'</code></td>
       <td>String</td>
-      <td>Specify the input and output timestamp format. Currently supported 
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+      <td>Specify the input and output timestamp/timestamp with local time 
zone format. Currently supported values are <code>'SQL'</code> and 
<code>'ISO-8601'</code>:
       <ul>
-        <li>Option <code>'SQL'</code> will parse input timestamp in 
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and 
output timestamp in the same format.</li>
-        <li>Option <code>'ISO-8601'</code>will parse input timestamp in 
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and 
output timestamp in the same format.</li>
+        <li>Option <code>'SQL'</code> will parse input timestamp in 
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g "2020-12-30 12:13:14.123", 
+        parse input timestamp with local time zone in "yyyy-MM-dd 
HH:mm:ss.s{precision}'Z'" format, e.g "2020-12-30 12:13:14.123Z" and output 
timestamp in the same format.</li>
+        <li>Option <code>'ISO-8601'</code>will parse input timestamp in 
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g "2020-12-30T12:13:14.123" 

Review comment:
       ```suggestion
           <li>Option <code>'ISO-8601'</code>will parse input TIMESTAMP in 
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g "2020-12-30T12:13:14.123" 
   ```

##########
File path: 
flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
##########
@@ -502,9 +516,17 @@ private void testParseErrors(TestSpec spec) throws 
Exception {
                        .json("{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}")
                        .rowType(ROW(FIELD("map", MAP(STRING(), INT()))))
                        .expect(Row.of(createHashMap("key1", 123, "key2", 
null)))
-                       .expectErrorMessage("Failed to deserialize JSON 
'{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}'")
+                       .expectErrorMessage("Failed to deserialize JSON 
'{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}'"),
 
+               TestSpec
+                       .json("{\"id\":\"2019-11-12T18:00:12\"}")
+                       .rowType(ROW(FIELD("id", 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(0))))
+                       .expectErrorMessage("Failed to deserialize JSON 
'{\"id\":\"2019-11-12T18:00:12\"}'"),
 
+               TestSpec
+                       .json("{\"id\":\"2019-11-12T18:00:12\"}")
+                       .rowType(ROW(FIELD("id", 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(0))))
+                       .expectErrorMessage("Failed to deserialize JSON 
'{\"id\":\"2019-11-12T18:00:12+0800\"}'")

Review comment:
       Why the test case is the same with above one, but the exception message 
is different?

##########
File path: docs/dev/table/connectors/formats/json.md
##########
@@ -108,10 +108,12 @@ Format Options
       <td>optional</td>
       <td style="word-wrap: break-word;"><code>'SQL'</code></td>
       <td>String</td>
-      <td>Specify the input and output timestamp format. Currently supported 
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+      <td>Specify the input and output timestamp/timestamp with local time 
zone format. Currently supported values are <code>'SQL'</code> and 
<code>'ISO-8601'</code>:

Review comment:
       ```suggestion
         <td>Specify the input and output timestamp format for 
<code>TIMESTAMP</code> and <code>TIMESTAMP WITH LOCAL TIME ZONE</code> type. 
Currently supported values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
   ```

##########
File path: 
flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
##########
@@ -502,9 +516,17 @@ private void testParseErrors(TestSpec spec) throws 
Exception {
                        .json("{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}")
                        .rowType(ROW(FIELD("map", MAP(STRING(), INT()))))
                        .expect(Row.of(createHashMap("key1", 123, "key2", 
null)))
-                       .expectErrorMessage("Failed to deserialize JSON 
'{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}'")
+                       .expectErrorMessage("Failed to deserialize JSON 
'{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}'"),
 
+               TestSpec
+                       .json("{\"id\":\"2019-11-12T18:00:12\"}")
+                       .rowType(ROW(FIELD("id", 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(0))))
+                       .expectErrorMessage("Failed to deserialize JSON 
'{\"id\":\"2019-11-12T18:00:12\"}'"),
 
+               TestSpec
+                       .json("{\"id\":\"2019-11-12T18:00:12\"}")
+                       .rowType(ROW(FIELD("id", 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(0))))
+                       .expectErrorMessage("Failed to deserialize JSON 
'{\"id\":\"2019-11-12T18:00:12+0800\"}'")

Review comment:
       Why the test case is the same with above one, but the exception message 
is different?

##########
File path: docs/dev/table/connectors/formats/json.md
##########
@@ -187,6 +189,10 @@ The following table lists the type mapping from Flink type 
to JSON type.
       <td><code>TIMESTAMP</code></td>
       <td><code>string with format: date-time</code></td>
     </tr>
+    <tr>
+      <td><code>TIMESTAMP_WITH_LOCAL_TIME_ZONE</code></td>
+      <td><code>string with format: date-time(with UTC time zone)</code></td>

Review comment:
       ```suggestion
         <td><code>string with format: date-time (with UTC time 
zone)</code></td>
   ```

##########
File path: docs/dev/table/connectors/formats/json.md
##########
@@ -108,10 +108,12 @@ Format Options
       <td>optional</td>
       <td style="word-wrap: break-word;"><code>'SQL'</code></td>
       <td>String</td>
-      <td>Specify the input and output timestamp format. Currently supported 
values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
+      <td>Specify the input and output timestamp/timestamp with local time 
zone format. Currently supported values are <code>'SQL'</code> and 
<code>'ISO-8601'</code>:
       <ul>
-        <li>Option <code>'SQL'</code> will parse input timestamp in 
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and 
output timestamp in the same format.</li>
-        <li>Option <code>'ISO-8601'</code>will parse input timestamp in 
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and 
output timestamp in the same format.</li>
+        <li>Option <code>'SQL'</code> will parse input timestamp in 
"yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g "2020-12-30 12:13:14.123", 
+        parse input timestamp with local time zone in "yyyy-MM-dd 
HH:mm:ss.s{precision}'Z'" format, e.g "2020-12-30 12:13:14.123Z" and output 
timestamp in the same format.</li>
+        <li>Option <code>'ISO-8601'</code>will parse input timestamp in 
"yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g "2020-12-30T12:13:14.123" 
+        parse input timestamp with local time zone in 
"yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" format, e.g "2020-12-30T12:13:14.123Z" 
and output timestamp in the same format.</li>

Review comment:
       ```suggestion
           parse input TIMESTAMP WITH LOCAL TIME ZONE in 
"yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" format, e.g "2020-12-30T12:13:14.123Z" 
and output timestamp in the same format.</li>
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to