This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new ce0c2671a0a [HUDI-7647] READ_UTC_TIMEZONE doesn't affect log files for MOR tables (#11066) ce0c2671a0a is described below commit ce0c2671a0a5e010173e0e6caf9c21ca2f175a30 Author: Марк Бухнер <66881554+alowa...@users.noreply.github.com> AuthorDate: Wed Apr 24 08:06:25 2024 +0700 [HUDI-7647] READ_UTC_TIMEZONE doesn't affect log files for MOR tables (#11066) --- .../hudi/source/stats/ColumnStatsIndices.java | 2 +- .../table/format/mor/MergeOnReadInputFormat.java | 8 ++--- .../apache/hudi/util/AvroToRowDataConverters.java | 42 +++++++++++++--------- .../apache/hudi/table/ITTestHoodieDataSource.java | 31 ++++++++-------- 4 files changed, 46 insertions(+), 37 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java index 05931876603..7032f299368 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java @@ -272,7 +272,7 @@ public class ColumnStatsIndices { LogicalType logicalType, Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter> converters) { AvroToRowDataConverters.AvroToRowDataConverter converter = - converters.computeIfAbsent(logicalType, k -> AvroToRowDataConverters.createConverter(logicalType)); + converters.computeIfAbsent(logicalType, k -> AvroToRowDataConverters.createConverter(logicalType, true)); return converter.convert(rawVal); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 29bb0a06d8c..3690fc911d8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -351,7 +351,7 @@ public class MergeOnReadInputFormat final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema()); final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema); final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = - AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType()); + AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType(), conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE)); final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, internalSchemaManager.getQuerySchema(), conf, hadoopConf); final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator(); final int[] pkOffset = tableState.getPkOffsetsInRequired(); @@ -431,7 +431,7 @@ public class MergeOnReadInputFormat final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema()); final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema); final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = - AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType()); + AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType(), conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE)); final FormatUtils.BoundedMemoryRecords records = new FormatUtils.BoundedMemoryRecords(split, tableSchema, internalSchemaManager.getQuerySchema(), hadoopConf, conf); final Iterator<HoodieRecord<?>> recordsIterator = records.getRecordsIterator(); @@ -478,7 +478,7 @@ public class MergeOnReadInputFormat protected ClosableIterator<RowData> getFullLogFileIterator(MergeOnReadInputSplit split) { final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema()); final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = - AvroToRowDataConverters.createRowConverter(tableState.getRowType()); + AvroToRowDataConverters.createRowConverter(tableState.getRowType(), conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE)); final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, InternalSchema.getEmptyInternalSchema(), conf, hadoopConf); final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator(); @@ -736,7 +736,7 @@ public class MergeOnReadInputFormat this.operationPos = operationPos; this.avroProjection = avroProjection; this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType, flinkConf.getBoolean(FlinkOptions.WRITE_UTC_TIMEZONE)); - this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType); + this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType, flinkConf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE)); this.projection = projection; this.instantRange = split.getInstantRange().orElse(null); List<String> mergers = Arrays.stream(flinkConf.getString(FlinkOptions.RECORD_MERGER_IMPLS).split(",")) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java index 38633b8ad9e..0caafca8259 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java @@ -43,6 +43,7 @@ import org.joda.time.DateTimeFieldType; import java.io.Serializable; import java.lang.reflect.Array; import java.nio.ByteBuffer; +import java.sql.Timestamp; import java.time.Instant; import java.time.LocalDate; import java.time.LocalTime; @@ -72,12 +73,15 @@ public class AvroToRowDataConverters { // ------------------------------------------------------------------------------------- // Runtime Converters // ------------------------------------------------------------------------------------- - public static AvroToRowDataConverter createRowConverter(RowType rowType) { + return createRowConverter(rowType, true); + } + + public static AvroToRowDataConverter createRowConverter(RowType rowType, boolean utcTimezone) { final AvroToRowDataConverter[] fieldConverters = rowType.getFields().stream() .map(RowType.RowField::getType) - .map(AvroToRowDataConverters::createNullableConverter) + .map(type -> AvroToRowDataConverters.createNullableConverter(type, utcTimezone)) .toArray(AvroToRowDataConverter[]::new); final int arity = rowType.getFieldCount(); @@ -94,8 +98,8 @@ public class AvroToRowDataConverters { /** * Creates a runtime converter which is null safe. */ - private static AvroToRowDataConverter createNullableConverter(LogicalType type) { - final AvroToRowDataConverter converter = createConverter(type); + private static AvroToRowDataConverter createNullableConverter(LogicalType type, boolean utcTimezone) { + final AvroToRowDataConverter converter = createConverter(type, utcTimezone); return avroObject -> { if (avroObject == null) { return null; @@ -107,7 +111,7 @@ public class AvroToRowDataConverters { /** * Creates a runtime converter which assuming input object is not null. */ - public static AvroToRowDataConverter createConverter(LogicalType type) { + public static AvroToRowDataConverter createConverter(LogicalType type, boolean utcTimezone) { switch (type.getTypeRoot()) { case NULL: return avroObject -> null; @@ -129,9 +133,9 @@ public class AvroToRowDataConverters { case TIME_WITHOUT_TIME_ZONE: return AvroToRowDataConverters::convertToTime; case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return createTimestampConverter(((LocalZonedTimestampType) type).getPrecision()); + return createTimestampConverter(((LocalZonedTimestampType) type).getPrecision(), true); case TIMESTAMP_WITHOUT_TIME_ZONE: - return createTimestampConverter(((TimestampType) type).getPrecision()); + return createTimestampConverter(((TimestampType) type).getPrecision(), utcTimezone); case CHAR: case VARCHAR: return avroObject -> StringData.fromString(avroObject.toString()); @@ -141,12 +145,12 @@ public class AvroToRowDataConverters { case DECIMAL: return createDecimalConverter((DecimalType) type); case ARRAY: - return createArrayConverter((ArrayType) type); + return createArrayConverter((ArrayType) type, utcTimezone); case ROW: - return createRowConverter((RowType) type); + return createRowConverter((RowType) type, utcTimezone); case MAP: case MULTISET: - return createMapConverter(type); + return createMapConverter(type, utcTimezone); default: throw new UnsupportedOperationException("Unsupported type: " + type); } @@ -170,9 +174,9 @@ public class AvroToRowDataConverters { }; } - private static AvroToRowDataConverter createArrayConverter(ArrayType arrayType) { + private static AvroToRowDataConverter createArrayConverter(ArrayType arrayType, boolean utcTimezone) { final AvroToRowDataConverter elementConverter = - createNullableConverter(arrayType.getElementType()); + createNullableConverter(arrayType.getElementType(), utcTimezone); final Class<?> elementClass = LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType()); @@ -187,11 +191,11 @@ public class AvroToRowDataConverters { }; } - private static AvroToRowDataConverter createMapConverter(LogicalType type) { + private static AvroToRowDataConverter createMapConverter(LogicalType type, boolean utcTimezone) { final AvroToRowDataConverter keyConverter = - createConverter(DataTypes.STRING().getLogicalType()); + createConverter(DataTypes.STRING().getLogicalType(), utcTimezone); final AvroToRowDataConverter valueConverter = - createNullableConverter(AvroSchemaConverter.extractValueTypeToAvroMap(type)); + createNullableConverter(AvroSchemaConverter.extractValueTypeToAvroMap(type), utcTimezone); return avroObject -> { final Map<?, ?> map = (Map<?, ?>) avroObject; @@ -205,7 +209,7 @@ public class AvroToRowDataConverters { }; } - private static AvroToRowDataConverter createTimestampConverter(int precision) { + private static AvroToRowDataConverter createTimestampConverter(int precision, boolean utcTimezone) { final ChronoUnit chronoUnit; if (precision <= 3) { chronoUnit = ChronoUnit.MILLIS; @@ -233,7 +237,11 @@ public class AvroToRowDataConverters { "Unexpected object type for TIMESTAMP logical type. Received: " + avroObject); } } - return TimestampData.fromInstant(instant); + if (utcTimezone) { + return TimestampData.fromInstant(instant); + } else { + return TimestampData.fromTimestamp(Timestamp.from(instant)); // this applies the local timezone + } }; } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index ac38d92a577..bf050fe399d 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -480,7 +480,7 @@ public class ITTestHoodieDataSource { } @ParameterizedTest - @MethodSource("tableTypeAndPartitioningParams") + @MethodSource("tableTypeAndBooleanTrueFalseParams") void testStreamReadFilterByPartition(HoodieTableType tableType, boolean hiveStylePartitioning) throws Exception { Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setString(FlinkOptions.TABLE_NAME, "t1"); @@ -568,7 +568,7 @@ public class ITTestHoodieDataSource { } @ParameterizedTest - @MethodSource("tableTypeAndPartitioningParams") + @MethodSource("tableTypeAndBooleanTrueFalseParams") void testWriteAndReadWithProctimeSequence(HoodieTableType tableType, boolean hiveStylePartitioning) { TableEnvironment tableEnv = batchTableEnv; String hoodieTableDDL = sql("t1") @@ -591,7 +591,7 @@ public class ITTestHoodieDataSource { } @ParameterizedTest - @MethodSource("tableTypeAndPartitioningParams") + @MethodSource("tableTypeAndBooleanTrueFalseParams") void testWriteAndReadWithProctimeSequenceWithTsColumnExisting(HoodieTableType tableType, boolean hiveStylePartitioning) { TableEnvironment tableEnv = batchTableEnv; String hoodieTableDDL = sql("t1") @@ -641,7 +641,7 @@ public class ITTestHoodieDataSource { } @ParameterizedTest - @MethodSource("tableTypeAndPartitioningParams") + @MethodSource("tableTypeAndBooleanTrueFalseParams") void testBatchModeUpsert(HoodieTableType tableType, boolean hiveStylePartitioning) { TableEnvironment tableEnv = batchTableEnv; String hoodieTableDDL = sql("t1") @@ -1836,8 +1836,8 @@ public class ITTestHoodieDataSource { } @ParameterizedTest - @EnumSource(value = HoodieTableType.class) - void testWriteReadWithTimestampWithoutTZ(HoodieTableType tableType) { + @MethodSource("tableTypeAndBooleanTrueFalseParams") + void testWriteReadWithTimestampWithoutTZ(HoodieTableType tableType, boolean readUtcTimezone) { TableEnvironment tableEnv = batchTableEnv; tableEnv.getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles")); String createTable = sql("t1") @@ -1849,8 +1849,7 @@ public class ITTestHoodieDataSource { .option(FlinkOptions.PRECOMBINE_FIELD, "f1") .option(FlinkOptions.TABLE_TYPE, tableType) .option(FlinkOptions.WRITE_UTC_TIMEZONE, false) - //FlinkOptions.READ_UTC_TIMEZONE doesn't affect in MergeOnReadInputFormat since the option isn't supported in AvroToRowDataConverters - //.option(FlinkOptions.READ_UTC_TIMEZONE, false) + .option(FlinkOptions.READ_UTC_TIMEZONE, readUtcTimezone) .pkField("f0") .noPartition() .end(); @@ -1872,15 +1871,17 @@ public class ITTestHoodieDataSource { List<Row> result = CollectionUtil.iterableToList( () -> tableEnv.sqlQuery("select * from t1").execute().collect()); formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"); + + final ZoneId expectedZoneId = readUtcTimezone ? ZoneId.of("UTC") : ZoneId.systemDefault(); final String expected = "[" + "+I[1" + ", abc" - + ", " + formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 1000), ZoneId.of("UTC"))) - + ", " + formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 2000), ZoneId.of("UTC"))) + "], " + + ", " + formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 1000), expectedZoneId)) + + ", " + formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 2000), expectedZoneId)) + "], " + "+I[2" + ", def" - + ", " + formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 3000), ZoneId.of("UTC"))) - + ", " + formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 4000), ZoneId.of("UTC"))) + "]]"; + + ", " + formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 3000), expectedZoneId)) + + ", " + formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 4000), expectedZoneId)) + "]]"; assertRowsEquals(result, expected); } @@ -2015,7 +2016,7 @@ public class ITTestHoodieDataSource { } @ParameterizedTest - @MethodSource("tableTypeAndPartitioningParams") + @MethodSource("tableTypeAndBooleanTrueFalseParams") void testDynamicPartitionPrune(HoodieTableType tableType, boolean hiveStylePartitioning) throws Exception { Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setString(FlinkOptions.TABLE_NAME, "t1"); @@ -2147,9 +2148,9 @@ public class ITTestHoodieDataSource { } /** - * Return test params => (HoodieTableType, hive style partitioning). + * Return test params => (HoodieTableType, true/false). */ - private static Stream<Arguments> tableTypeAndPartitioningParams() { + private static Stream<Arguments> tableTypeAndBooleanTrueFalseParams() { Object[][] data = new Object[][] { {HoodieTableType.COPY_ON_WRITE, false},