This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 99d52b79d6061f441ee0a74042558c5911ca9dd5
Author: chengming <[email protected]>
AuthorDate: Sat Mar 2 09:08:06 2024 +0800

    [HUDI-9424] Support using local timezone when writing flink TIMESTAMP data 
(#10594)
---
 .../apache/hudi/configuration/FlinkOptions.java    | 11 ++-
 .../sink/transform/RowDataToHoodieFunction.java    |  3 +-
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |  2 +-
 .../org/apache/hudi/table/HoodieTableSource.java   |  2 +-
 .../table/format/mor/MergeOnReadInputFormat.java   |  4 +-
 .../apache/hudi/util/RowDataToAvroConverters.java  | 54 ++++++++++----
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 53 ++++++++++++++
 .../hudi/utils/TestRowDataToAvroConverters.java    | 84 ++++++++++++++++++++++
 8 files changed, 195 insertions(+), 18 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 6c976b868fd..6f0f6db7c28 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -290,7 +290,7 @@ public class FlinkOptions extends HoodieConfig {
           + "   log file records(combines the two records with same key for 
base and log file records), then read the left log file records");
 
   @AdvancedConfig
-  public static final ConfigOption<Boolean> UTC_TIMEZONE = ConfigOptions
+  public static final ConfigOption<Boolean> READ_UTC_TIMEZONE = ConfigOptions
       .key("read.utc-timezone")
       .booleanType()
       .defaultValue(true)
@@ -481,6 +481,15 @@ public class FlinkOptions extends HoodieConfig {
   public static final String PARTITION_FORMAT_HOUR = "yyyyMMddHH";
   public static final String PARTITION_FORMAT_DAY = "yyyyMMdd";
   public static final String PARTITION_FORMAT_DASHED_DAY = "yyyy-MM-dd";
+
+  @AdvancedConfig
+  public static final ConfigOption<Boolean> WRITE_UTC_TIMEZONE = ConfigOptions
+        .key("write.utc-timezone")
+        .booleanType()
+        .defaultValue(true)
+        .withDescription("Use UTC timezone or local timezone to the conversion 
between epoch"
+            + " time and LocalDateTime. Default value is utc timezone for 
forward compatibility.");
+
   @AdvancedConfig
   public static final ConfigOption<String> PARTITION_FORMAT = ConfigOptions
       .key("write.partition.format")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
index bfc7d7d62ad..0a13bea513d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
 import org.apache.hudi.sink.utils.PayloadCreation;
@@ -84,7 +85,7 @@ public class RowDataToHoodieFunction<I extends RowData, O 
extends HoodieRecord>
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     this.avroSchema = StreamerUtil.getSourceSchema(this.config);
-    this.converter = RowDataToAvroConverters.createConverter(this.rowType);
+    this.converter = RowDataToAvroConverters.createConverter(this.rowType, 
this.config.getBoolean(FlinkOptions.WRITE_UTC_TIMEZONE));
     this.keyGenerator =
         HoodieAvroKeyGeneratorFactory
             .createKeyGenerator(flinkConf2TypedProperties(this.config));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index e8050d15761..25ba73f97d3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -455,7 +455,7 @@ public class FlinkStreamerConfig extends Configuration {
       conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, 
config.sourceAvroSchemaPath);
     }
     conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, config.sourceAvroSchema);
-    conf.setBoolean(FlinkOptions.UTC_TIMEZONE, config.utcTimezone);
+    conf.setBoolean(FlinkOptions.READ_UTC_TIMEZONE, config.utcTimezone);
     conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, 
config.writePartitionUrlEncode);
     conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, 
config.hiveStylePartitioning);
     conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, config.writeTaskMaxSize);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index dc6cddd4a55..b5fdea7a229 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -551,7 +551,7 @@ public class HoodieTableSource implements
         this.predicates,
         this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // 
ParquetInputFormat always uses the limit value
         getParquetConf(this.conf, this.hadoopConf),
-        this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
+        this.conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE),
         this.internalSchemaManager
     );
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index f13098fc7c7..29bb0a06d8c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -332,7 +332,7 @@ public class MergeOnReadInputFormat
 
     return RecordIterators.getParquetRecordIterator(
         internalSchemaManager,
-        this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
+        this.conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE),
         true,
         HadoopConfigurations.getParquetConf(this.conf, hadoopConf),
         fieldNames.toArray(new String[0]),
@@ -735,7 +735,7 @@ public class MergeOnReadInputFormat
       this.emitDelete = emitDelete;
       this.operationPos = operationPos;
       this.avroProjection = avroProjection;
-      this.rowDataToAvroConverter = 
RowDataToAvroConverters.createConverter(tableRowType);
+      this.rowDataToAvroConverter = 
RowDataToAvroConverters.createConverter(tableRowType, 
flinkConf.getBoolean(FlinkOptions.WRITE_UTC_TIMEZONE));
       this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(requiredRowType);
       this.projection = projection;
       this.instantRange = split.getInstantRange().orElse(null);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
index ff2903c0a73..23dbe71721a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
@@ -77,6 +77,10 @@ public class RowDataToAvroConverters {
    * Flink Table & SQL internal data structures to corresponding Avro data 
structures.
    */
   public static RowDataToAvroConverter createConverter(LogicalType type) {
+    return createConverter(type, true);
+  }
+
+  public static RowDataToAvroConverter createConverter(LogicalType type, 
boolean utcTimezone) {
     final RowDataToAvroConverter converter;
     switch (type.getTypeRoot()) {
       case NULL:
@@ -156,8 +160,34 @@ public class RowDataToAvroConverters {
             };
         break;
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        int precision = DataTypeUtils.precision(type);
+        if (precision <= 3) {
+          converter =
+            new RowDataToAvroConverter() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public Object convert(Schema schema, Object object) {
+                return ((TimestampData) object).toInstant().toEpochMilli();
+              }
+          };
+        } else if (precision <= 6) {
+          converter =
+              new RowDataToAvroConverter() {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public Object convert(Schema schema, Object object) {
+                  Instant instant = ((TimestampData) object).toInstant();
+                  return  
Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1000_000), 
instant.getNano() / 1000);
+                }
+            };
+        } else {
+          throw new UnsupportedOperationException("Unsupported timestamp 
precision: " + precision);
+        }
+        break;
       case TIMESTAMP_WITHOUT_TIME_ZONE:
-        final int precision = DataTypeUtils.precision(type);
+        precision = DataTypeUtils.precision(type);
         if (precision <= 3) {
           converter =
               new RowDataToAvroConverter() {
@@ -165,7 +195,7 @@ public class RowDataToAvroConverters {
 
                 @Override
                 public Object convert(Schema schema, Object object) {
-                  return ((TimestampData) object).toInstant().toEpochMilli();
+                  return utcTimezone ? ((TimestampData) 
object).toInstant().toEpochMilli() : ((TimestampData) 
object).toTimestamp().getTime();
                 }
               };
         } else if (precision <= 6) {
@@ -175,7 +205,7 @@ public class RowDataToAvroConverters {
 
                 @Override
                 public Object convert(Schema schema, Object object) {
-                  Instant instant = ((TimestampData) object).toInstant();
+                  Instant instant = utcTimezone ? ((TimestampData) 
object).toInstant() : ((TimestampData) object).toTimestamp().toInstant();
                   return  
Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1000_000), 
instant.getNano() / 1000);
                 }
               };
@@ -196,14 +226,14 @@ public class RowDataToAvroConverters {
             };
         break;
       case ARRAY:
-        converter = createArrayConverter((ArrayType) type);
+        converter = createArrayConverter((ArrayType) type, utcTimezone);
         break;
       case ROW:
-        converter = createRowConverter((RowType) type);
+        converter = createRowConverter((RowType) type, utcTimezone);
         break;
       case MAP:
       case MULTISET:
-        converter = createMapConverter(type);
+        converter = createMapConverter(type, utcTimezone);
         break;
       case RAW:
       default:
@@ -241,10 +271,10 @@ public class RowDataToAvroConverters {
     };
   }
 
-  private static RowDataToAvroConverter createRowConverter(RowType rowType) {
+  private static RowDataToAvroConverter createRowConverter(RowType rowType, 
boolean utcTimezone) {
     final RowDataToAvroConverter[] fieldConverters =
         rowType.getChildren().stream()
-            .map(RowDataToAvroConverters::createConverter)
+            .map(type -> createConverter(type, utcTimezone))
             .toArray(RowDataToAvroConverter[]::new);
     final LogicalType[] fieldTypes =
         rowType.getFields().stream()
@@ -276,10 +306,10 @@ public class RowDataToAvroConverters {
     };
   }
 
-  private static RowDataToAvroConverter createArrayConverter(ArrayType 
arrayType) {
+  private static RowDataToAvroConverter createArrayConverter(ArrayType 
arrayType, boolean utcTimezone) {
     LogicalType elementType = arrayType.getElementType();
     final ArrayData.ElementGetter elementGetter = 
ArrayData.createElementGetter(elementType);
-    final RowDataToAvroConverter elementConverter = 
createConverter(arrayType.getElementType());
+    final RowDataToAvroConverter elementConverter = 
createConverter(arrayType.getElementType(), utcTimezone);
 
     return new RowDataToAvroConverter() {
       private static final long serialVersionUID = 1L;
@@ -299,10 +329,10 @@ public class RowDataToAvroConverters {
     };
   }
 
-  private static RowDataToAvroConverter createMapConverter(LogicalType type) {
+  private static RowDataToAvroConverter createMapConverter(LogicalType type, 
boolean utcTimezone) {
     LogicalType valueType = 
AvroSchemaConverter.extractValueTypeToAvroMap(type);
     final ArrayData.ElementGetter valueGetter = 
ArrayData.createElementGetter(valueType);
-    final RowDataToAvroConverter valueConverter = createConverter(valueType);
+    final RowDataToAvroConverter valueConverter = createConverter(valueType, 
utcTimezone);
 
     return new RowDataToAvroConverter() {
       private static final long serialVersionUID = 1L;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 111bb42e73e..de80a219989 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -59,7 +59,10 @@ import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
+import java.time.Instant;
+import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -1823,6 +1826,56 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result, expected);
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testWriteReadWithTimestampWithoutTZ(HoodieTableType tableType) {
+    TableEnvironment tableEnv = batchTableEnv;
+    tableEnv.getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles"));
+    String createTable = sql("t1")
+        .field("f0 int")
+        .field("f1 varchar(10)")
+        .field("f2 TIMESTAMP(3)")
+        .field("f3 TIMESTAMP(6)")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.PRECOMBINE_FIELD, "f1")
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.WRITE_UTC_TIMEZONE, false)
+        //FlinkOptions.READ_UTC_TIMEZONE doesn't affect in 
MergeOnReadInputFormat since the option isn't supported in 
AvroToRowDataConverters
+        //.option(FlinkOptions.READ_UTC_TIMEZONE, false)
+        .pkField("f0")
+        .noPartition()
+        .end();
+    tableEnv.executeSql(createTable);
+
+    long epochMillis = 0L;
+    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss");
+    String insertInto = "insert into t1 values\n"
+        + "(1"
+        + ", 'abc'"
+        + ", TIMESTAMP '" + 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 
1000), ZoneId.systemDefault())) + "'"
+        + ", TIMESTAMP '" + 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 
2000), ZoneId.systemDefault())) + "'),\n"
+        + "(2"
+        + ", 'def'"
+        + ", TIMESTAMP '" + 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 
3000), ZoneId.systemDefault())) + "'"
+        + ", TIMESTAMP '" + 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 
4000), ZoneId.systemDefault())) + "')";
+    execInsertSql(tableEnv, insertInto);
+
+    List<Row> result = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss");
+    final String expected = "["
+        + "+I[1"
+        + ", abc"
+        + ", " + 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 
1000), ZoneId.of("UTC")))
+        + ", " + 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 
2000), ZoneId.of("UTC"))) + "], "
+        + "+I[2"
+        + ", def"
+        + ", " + 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 
3000), ZoneId.of("UTC")))
+        + ", " + 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 
4000), ZoneId.of("UTC"))) + "]]";
+
+    assertRowsEquals(result, expected);
+  }
+
   @ParameterizedTest
   @MethodSource("tableTypeQueryTypeNumInsertAndCompactionDeltaCommitsParams")
   void testReadMetaFields(HoodieTableType tableType, String queryType, int 
numInsertBatches, int compactionDeltaCommits) throws Exception {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
new file mode 100644
index 00000000000..0ab0626d034
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonToRowDataConverters;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.RowDataToAvroConverters;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+
+class TestRowDataToAvroConverters {
+
+  DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss");
+  @Test
+  void testRowDataToAvroStringToRowDataWithLocalTimezone() throws 
JsonProcessingException {
+    String timestampFromLocal = "2021-03-30 07:44:29";
+
+    DataType rowDataType = ROW(FIELD("timestamp_from_local", TIMESTAMP()));
+    JsonToRowDataConverters.JsonToRowDataConverter jsonToRowDataConverter =
+            new JsonToRowDataConverters(true, true, TimestampFormat.SQL)
+                    .createConverter(rowDataType.getLogicalType());
+    Object rowData = jsonToRowDataConverter.convert(new 
ObjectMapper().readTree("{\"timestamp_from_local\":\"" + timestampFromLocal + 
"\"}"));
+
+    RowType rowType = (RowType) DataTypes.ROW(DataTypes.FIELD("f_timestamp", 
DataTypes.TIMESTAMP(3))).getLogicalType();
+    RowDataToAvroConverters.RowDataToAvroConverter converter =
+            RowDataToAvroConverters.createConverter(rowType, false);
+    GenericRecord avroRecord =
+            (GenericRecord) 
converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData);
+    Assertions.assertEquals(timestampFromLocal, 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) 
avroRecord.get(0)), ZoneId.systemDefault())));
+  }
+
+  @Test
+  void testRowDataToAvroStringToRowDataWithUtcTimezone() throws 
JsonProcessingException {
+    String timestampFromUtc0 = "2021-03-30 07:44:29";
+
+    DataType rowDataType = ROW(FIELD("timestamp_from_utc_0", TIMESTAMP()));
+    JsonToRowDataConverters.JsonToRowDataConverter jsonToRowDataConverter =
+            new JsonToRowDataConverters(true, true, TimestampFormat.SQL)
+                    .createConverter(rowDataType.getLogicalType());
+    Object rowData = jsonToRowDataConverter.convert(new 
ObjectMapper().readTree("{\"timestamp_from_utc_0\":\"" + timestampFromUtc0 + 
"\"}"));
+
+    RowType rowType = (RowType) DataTypes.ROW(DataTypes.FIELD("f_timestamp", 
DataTypes.TIMESTAMP(3))).getLogicalType();
+    RowDataToAvroConverters.RowDataToAvroConverter converter =
+            RowDataToAvroConverters.createConverter(rowType);
+    GenericRecord avroRecord =
+            (GenericRecord) 
converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData);
+    Assertions.assertEquals(timestampFromUtc0, 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) 
avroRecord.get(0)), ZoneId.of("UTC"))));
+    Assertions.assertEquals("2021-03-30 08:44:29", 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) 
avroRecord.get(0)), ZoneId.of("UTC+1"))));
+    Assertions.assertEquals("2021-03-30 15:44:29", 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) 
avroRecord.get(0)), ZoneId.of("Asia/Shanghai"))));
+  }
+}
\ No newline at end of file

Reply via email to