This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 99d52b79d6061f441ee0a74042558c5911ca9dd5 Author: chengming <[email protected]> AuthorDate: Sat Mar 2 09:08:06 2024 +0800 [HUDI-9424] Support using local timezone when writing flink TIMESTAMP data (#10594) --- .../apache/hudi/configuration/FlinkOptions.java | 11 ++- .../sink/transform/RowDataToHoodieFunction.java | 3 +- .../apache/hudi/streamer/FlinkStreamerConfig.java | 2 +- .../org/apache/hudi/table/HoodieTableSource.java | 2 +- .../table/format/mor/MergeOnReadInputFormat.java | 4 +- .../apache/hudi/util/RowDataToAvroConverters.java | 54 ++++++++++---- .../apache/hudi/table/ITTestHoodieDataSource.java | 53 ++++++++++++++ .../hudi/utils/TestRowDataToAvroConverters.java | 84 ++++++++++++++++++++++ 8 files changed, 195 insertions(+), 18 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 6c976b868fd..6f0f6db7c28 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -290,7 +290,7 @@ public class FlinkOptions extends HoodieConfig { + " log file records(combines the two records with same key for base and log file records), then read the left log file records"); @AdvancedConfig - public static final ConfigOption<Boolean> UTC_TIMEZONE = ConfigOptions + public static final ConfigOption<Boolean> READ_UTC_TIMEZONE = ConfigOptions .key("read.utc-timezone") .booleanType() .defaultValue(true) @@ -481,6 +481,15 @@ public class FlinkOptions extends HoodieConfig { public static final String PARTITION_FORMAT_HOUR = "yyyyMMddHH"; public static final String PARTITION_FORMAT_DAY = "yyyyMMdd"; public static final String PARTITION_FORMAT_DASHED_DAY = "yyyy-MM-dd"; + + @AdvancedConfig + public static final ConfigOption<Boolean> WRITE_UTC_TIMEZONE = ConfigOptions + .key("write.utc-timezone") + .booleanType() + .defaultValue(true) + .withDescription("Use UTC timezone or local timezone to the conversion between epoch" + + " time and LocalDateTime. Default value is utc timezone for forward compatibility."); + @AdvancedConfig public static final ConfigOption<String> PARTITION_FORMAT = ConfigOptions .key("write.partition.format") diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java index bfc7d7d62ad..0a13bea513d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory; import org.apache.hudi.sink.utils.PayloadCreation; @@ -84,7 +85,7 @@ public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord> public void open(Configuration parameters) throws Exception { super.open(parameters); this.avroSchema = StreamerUtil.getSourceSchema(this.config); - this.converter = RowDataToAvroConverters.createConverter(this.rowType); + this.converter = RowDataToAvroConverters.createConverter(this.rowType, this.config.getBoolean(FlinkOptions.WRITE_UTC_TIMEZONE)); this.keyGenerator = HoodieAvroKeyGeneratorFactory .createKeyGenerator(flinkConf2TypedProperties(this.config)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index e8050d15761..25ba73f97d3 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -455,7 +455,7 @@ public class FlinkStreamerConfig extends Configuration { conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, config.sourceAvroSchemaPath); } conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, config.sourceAvroSchema); - conf.setBoolean(FlinkOptions.UTC_TIMEZONE, config.utcTimezone); + conf.setBoolean(FlinkOptions.READ_UTC_TIMEZONE, config.utcTimezone); conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, config.writePartitionUrlEncode); conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, config.hiveStylePartitioning); conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, config.writeTaskMaxSize); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index dc6cddd4a55..b5fdea7a229 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -551,7 +551,7 @@ public class HoodieTableSource implements this.predicates, this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value getParquetConf(this.conf, this.hadoopConf), - this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE), + this.conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE), this.internalSchemaManager ); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index f13098fc7c7..29bb0a06d8c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -332,7 +332,7 @@ public class MergeOnReadInputFormat return RecordIterators.getParquetRecordIterator( internalSchemaManager, - this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE), + this.conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE), true, HadoopConfigurations.getParquetConf(this.conf, hadoopConf), fieldNames.toArray(new String[0]), @@ -735,7 +735,7 @@ public class MergeOnReadInputFormat this.emitDelete = emitDelete; this.operationPos = operationPos; this.avroProjection = avroProjection; - this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType); + this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType, flinkConf.getBoolean(FlinkOptions.WRITE_UTC_TIMEZONE)); this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType); this.projection = projection; this.instantRange = split.getInstantRange().orElse(null); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java index ff2903c0a73..23dbe71721a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java @@ -77,6 +77,10 @@ public class RowDataToAvroConverters { * Flink Table & SQL internal data structures to corresponding Avro data structures. */ public static RowDataToAvroConverter createConverter(LogicalType type) { + return createConverter(type, true); + } + + public static RowDataToAvroConverter createConverter(LogicalType type, boolean utcTimezone) { final RowDataToAvroConverter converter; switch (type.getTypeRoot()) { case NULL: @@ -156,8 +160,34 @@ public class RowDataToAvroConverters { }; break; case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + int precision = DataTypeUtils.precision(type); + if (precision <= 3) { + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ((TimestampData) object).toInstant().toEpochMilli(); + } + }; + } else if (precision <= 6) { + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + Instant instant = ((TimestampData) object).toInstant(); + return Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1000_000), instant.getNano() / 1000); + } + }; + } else { + throw new UnsupportedOperationException("Unsupported timestamp precision: " + precision); + } + break; case TIMESTAMP_WITHOUT_TIME_ZONE: - final int precision = DataTypeUtils.precision(type); + precision = DataTypeUtils.precision(type); if (precision <= 3) { converter = new RowDataToAvroConverter() { @@ -165,7 +195,7 @@ public class RowDataToAvroConverters { @Override public Object convert(Schema schema, Object object) { - return ((TimestampData) object).toInstant().toEpochMilli(); + return utcTimezone ? ((TimestampData) object).toInstant().toEpochMilli() : ((TimestampData) object).toTimestamp().getTime(); } }; } else if (precision <= 6) { @@ -175,7 +205,7 @@ public class RowDataToAvroConverters { @Override public Object convert(Schema schema, Object object) { - Instant instant = ((TimestampData) object).toInstant(); + Instant instant = utcTimezone ? ((TimestampData) object).toInstant() : ((TimestampData) object).toTimestamp().toInstant(); return Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1000_000), instant.getNano() / 1000); } }; @@ -196,14 +226,14 @@ public class RowDataToAvroConverters { }; break; case ARRAY: - converter = createArrayConverter((ArrayType) type); + converter = createArrayConverter((ArrayType) type, utcTimezone); break; case ROW: - converter = createRowConverter((RowType) type); + converter = createRowConverter((RowType) type, utcTimezone); break; case MAP: case MULTISET: - converter = createMapConverter(type); + converter = createMapConverter(type, utcTimezone); break; case RAW: default: @@ -241,10 +271,10 @@ public class RowDataToAvroConverters { }; } - private static RowDataToAvroConverter createRowConverter(RowType rowType) { + private static RowDataToAvroConverter createRowConverter(RowType rowType, boolean utcTimezone) { final RowDataToAvroConverter[] fieldConverters = rowType.getChildren().stream() - .map(RowDataToAvroConverters::createConverter) + .map(type -> createConverter(type, utcTimezone)) .toArray(RowDataToAvroConverter[]::new); final LogicalType[] fieldTypes = rowType.getFields().stream() @@ -276,10 +306,10 @@ public class RowDataToAvroConverters { }; } - private static RowDataToAvroConverter createArrayConverter(ArrayType arrayType) { + private static RowDataToAvroConverter createArrayConverter(ArrayType arrayType, boolean utcTimezone) { LogicalType elementType = arrayType.getElementType(); final ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType); - final RowDataToAvroConverter elementConverter = createConverter(arrayType.getElementType()); + final RowDataToAvroConverter elementConverter = createConverter(arrayType.getElementType(), utcTimezone); return new RowDataToAvroConverter() { private static final long serialVersionUID = 1L; @@ -299,10 +329,10 @@ public class RowDataToAvroConverters { }; } - private static RowDataToAvroConverter createMapConverter(LogicalType type) { + private static RowDataToAvroConverter createMapConverter(LogicalType type, boolean utcTimezone) { LogicalType valueType = AvroSchemaConverter.extractValueTypeToAvroMap(type); final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType); - final RowDataToAvroConverter valueConverter = createConverter(valueType); + final RowDataToAvroConverter valueConverter = createConverter(valueType, utcTimezone); return new RowDataToAvroConverter() { private static final long serialVersionUID = 1L; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 111bb42e73e..de80a219989 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -59,7 +59,10 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.File; +import java.time.Instant; +import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -1823,6 +1826,56 @@ public class ITTestHoodieDataSource { assertRowsEquals(result, expected); } + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + void testWriteReadWithTimestampWithoutTZ(HoodieTableType tableType) { + TableEnvironment tableEnv = batchTableEnv; + tableEnv.getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles")); + String createTable = sql("t1") + .field("f0 int") + .field("f1 varchar(10)") + .field("f2 TIMESTAMP(3)") + .field("f3 TIMESTAMP(6)") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.PRECOMBINE_FIELD, "f1") + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.WRITE_UTC_TIMEZONE, false) + //FlinkOptions.READ_UTC_TIMEZONE doesn't affect in MergeOnReadInputFormat since the option isn't supported in AvroToRowDataConverters + //.option(FlinkOptions.READ_UTC_TIMEZONE, false) + .pkField("f0") + .noPartition() + .end(); + tableEnv.executeSql(createTable); + + long epochMillis = 0L; + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + String insertInto = "insert into t1 values\n" + + "(1" + + ", 'abc'" + + ", TIMESTAMP '" + formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 1000), ZoneId.systemDefault())) + "'" + + ", TIMESTAMP '" + formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 2000), ZoneId.systemDefault())) + "'),\n" + + "(2" + + ", 'def'" + + ", TIMESTAMP '" + formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 3000), ZoneId.systemDefault())) + "'" + + ", TIMESTAMP '" + formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 4000), ZoneId.systemDefault())) + "')"; + execInsertSql(tableEnv, insertInto); + + List<Row> result = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"); + final String expected = "[" + + "+I[1" + + ", abc" + + ", " + formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 1000), ZoneId.of("UTC"))) + + ", " + formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 2000), ZoneId.of("UTC"))) + "], " + + "+I[2" + + ", def" + + ", " + formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 3000), ZoneId.of("UTC"))) + + ", " + formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 4000), ZoneId.of("UTC"))) + "]]"; + + assertRowsEquals(result, expected); + } + @ParameterizedTest @MethodSource("tableTypeQueryTypeNumInsertAndCompactionDeltaCommitsParams") void testReadMetaFields(HoodieTableType tableType, String queryType, int numInsertBatches, int compactionDeltaCommits) throws Exception { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java new file mode 100644 index 00000000000..0ab0626d034 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils; + +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonToRowDataConverters; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.RowDataToAvroConverters; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; + +import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.api.DataTypes.FIELD; +import static org.apache.flink.table.api.DataTypes.TIMESTAMP; + +class TestRowDataToAvroConverters { + + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + @Test + void testRowDataToAvroStringToRowDataWithLocalTimezone() throws JsonProcessingException { + String timestampFromLocal = "2021-03-30 07:44:29"; + + DataType rowDataType = ROW(FIELD("timestamp_from_local", TIMESTAMP())); + JsonToRowDataConverters.JsonToRowDataConverter jsonToRowDataConverter = + new JsonToRowDataConverters(true, true, TimestampFormat.SQL) + .createConverter(rowDataType.getLogicalType()); + Object rowData = jsonToRowDataConverter.convert(new ObjectMapper().readTree("{\"timestamp_from_local\":\"" + timestampFromLocal + "\"}")); + + RowType rowType = (RowType) DataTypes.ROW(DataTypes.FIELD("f_timestamp", DataTypes.TIMESTAMP(3))).getLogicalType(); + RowDataToAvroConverters.RowDataToAvroConverter converter = + RowDataToAvroConverters.createConverter(rowType, false); + GenericRecord avroRecord = + (GenericRecord) converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData); + Assertions.assertEquals(timestampFromLocal, formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) avroRecord.get(0)), ZoneId.systemDefault()))); + } + + @Test + void testRowDataToAvroStringToRowDataWithUtcTimezone() throws JsonProcessingException { + String timestampFromUtc0 = "2021-03-30 07:44:29"; + + DataType rowDataType = ROW(FIELD("timestamp_from_utc_0", TIMESTAMP())); + JsonToRowDataConverters.JsonToRowDataConverter jsonToRowDataConverter = + new JsonToRowDataConverters(true, true, TimestampFormat.SQL) + .createConverter(rowDataType.getLogicalType()); + Object rowData = jsonToRowDataConverter.convert(new ObjectMapper().readTree("{\"timestamp_from_utc_0\":\"" + timestampFromUtc0 + "\"}")); + + RowType rowType = (RowType) DataTypes.ROW(DataTypes.FIELD("f_timestamp", DataTypes.TIMESTAMP(3))).getLogicalType(); + RowDataToAvroConverters.RowDataToAvroConverter converter = + RowDataToAvroConverters.createConverter(rowType); + GenericRecord avroRecord = + (GenericRecord) converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData); + Assertions.assertEquals(timestampFromUtc0, formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) avroRecord.get(0)), ZoneId.of("UTC")))); + Assertions.assertEquals("2021-03-30 08:44:29", formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) avroRecord.get(0)), ZoneId.of("UTC+1")))); + Assertions.assertEquals("2021-03-30 15:44:29", formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) avroRecord.get(0)), ZoneId.of("Asia/Shanghai")))); + } +} \ No newline at end of file
