This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ce0c2671a0a [HUDI-7647] READ_UTC_TIMEZONE doesn't affect log files for
MOR tables (#11066)
ce0c2671a0a is described below
commit ce0c2671a0a5e010173e0e6caf9c21ca2f175a30
Author: Марк Бухнер <[email protected]>
AuthorDate: Wed Apr 24 08:06:25 2024 +0700
[HUDI-7647] READ_UTC_TIMEZONE doesn't affect log files for MOR tables
(#11066)
---
.../hudi/source/stats/ColumnStatsIndices.java | 2 +-
.../table/format/mor/MergeOnReadInputFormat.java | 8 ++---
.../apache/hudi/util/AvroToRowDataConverters.java | 42 +++++++++++++---------
.../apache/hudi/table/ITTestHoodieDataSource.java | 31 ++++++++--------
4 files changed, 46 insertions(+), 37 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
index 05931876603..7032f299368 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
@@ -272,7 +272,7 @@ public class ColumnStatsIndices {
LogicalType logicalType,
Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter>
converters) {
AvroToRowDataConverters.AvroToRowDataConverter converter =
- converters.computeIfAbsent(logicalType, k ->
AvroToRowDataConverters.createConverter(logicalType));
+ converters.computeIfAbsent(logicalType, k ->
AvroToRowDataConverters.createConverter(logicalType, true));
return converter.convert(rawVal);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 29bb0a06d8c..3690fc911d8 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -351,7 +351,7 @@ public class MergeOnReadInputFormat
final Schema requiredSchema = new
Schema.Parser().parse(tableState.getRequiredAvroSchema());
final GenericRecordBuilder recordBuilder = new
GenericRecordBuilder(requiredSchema);
final AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter =
-
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
+
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType(),
conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split,
tableSchema, internalSchemaManager.getQuerySchema(), conf, hadoopConf);
final Iterator<String> logRecordsKeyIterator =
scanner.getRecords().keySet().iterator();
final int[] pkOffset = tableState.getPkOffsetsInRequired();
@@ -431,7 +431,7 @@ public class MergeOnReadInputFormat
final Schema requiredSchema = new
Schema.Parser().parse(tableState.getRequiredAvroSchema());
final GenericRecordBuilder recordBuilder = new
GenericRecordBuilder(requiredSchema);
final AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter =
-
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
+
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType(),
conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
final FormatUtils.BoundedMemoryRecords records = new
FormatUtils.BoundedMemoryRecords(split, tableSchema,
internalSchemaManager.getQuerySchema(), hadoopConf, conf);
final Iterator<HoodieRecord<?>> recordsIterator =
records.getRecordsIterator();
@@ -478,7 +478,7 @@ public class MergeOnReadInputFormat
protected ClosableIterator<RowData>
getFullLogFileIterator(MergeOnReadInputSplit split) {
final Schema tableSchema = new
Schema.Parser().parse(tableState.getAvroSchema());
final AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter =
- AvroToRowDataConverters.createRowConverter(tableState.getRowType());
+ AvroToRowDataConverters.createRowConverter(tableState.getRowType(),
conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split,
tableSchema, InternalSchema.getEmptyInternalSchema(), conf, hadoopConf);
final Iterator<String> logRecordsKeyIterator =
scanner.getRecords().keySet().iterator();
@@ -736,7 +736,7 @@ public class MergeOnReadInputFormat
this.operationPos = operationPos;
this.avroProjection = avroProjection;
this.rowDataToAvroConverter =
RowDataToAvroConverters.createConverter(tableRowType,
flinkConf.getBoolean(FlinkOptions.WRITE_UTC_TIMEZONE));
- this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(requiredRowType);
+ this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(requiredRowType,
flinkConf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
this.projection = projection;
this.instantRange = split.getInstantRange().orElse(null);
List<String> mergers =
Arrays.stream(flinkConf.getString(FlinkOptions.RECORD_MERGER_IMPLS).split(","))
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
index 38633b8ad9e..0caafca8259 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
@@ -43,6 +43,7 @@ import org.joda.time.DateTimeFieldType;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
+import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
@@ -72,12 +73,15 @@ public class AvroToRowDataConverters {
//
-------------------------------------------------------------------------------------
// Runtime Converters
//
-------------------------------------------------------------------------------------
-
public static AvroToRowDataConverter createRowConverter(RowType rowType) {
+ return createRowConverter(rowType, true);
+ }
+
+ public static AvroToRowDataConverter createRowConverter(RowType rowType,
boolean utcTimezone) {
final AvroToRowDataConverter[] fieldConverters =
rowType.getFields().stream()
.map(RowType.RowField::getType)
- .map(AvroToRowDataConverters::createNullableConverter)
+ .map(type -> AvroToRowDataConverters.createNullableConverter(type,
utcTimezone))
.toArray(AvroToRowDataConverter[]::new);
final int arity = rowType.getFieldCount();
@@ -94,8 +98,8 @@ public class AvroToRowDataConverters {
/**
* Creates a runtime converter which is null safe.
*/
- private static AvroToRowDataConverter createNullableConverter(LogicalType
type) {
- final AvroToRowDataConverter converter = createConverter(type);
+ private static AvroToRowDataConverter createNullableConverter(LogicalType
type, boolean utcTimezone) {
+ final AvroToRowDataConverter converter = createConverter(type,
utcTimezone);
return avroObject -> {
if (avroObject == null) {
return null;
@@ -107,7 +111,7 @@ public class AvroToRowDataConverters {
/**
* Creates a runtime converter which assuming input object is not null.
*/
- public static AvroToRowDataConverter createConverter(LogicalType type) {
+ public static AvroToRowDataConverter createConverter(LogicalType type,
boolean utcTimezone) {
switch (type.getTypeRoot()) {
case NULL:
return avroObject -> null;
@@ -129,9 +133,9 @@ public class AvroToRowDataConverters {
case TIME_WITHOUT_TIME_ZONE:
return AvroToRowDataConverters::convertToTime;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- return createTimestampConverter(((LocalZonedTimestampType)
type).getPrecision());
+ return createTimestampConverter(((LocalZonedTimestampType)
type).getPrecision(), true);
case TIMESTAMP_WITHOUT_TIME_ZONE:
- return createTimestampConverter(((TimestampType) type).getPrecision());
+ return createTimestampConverter(((TimestampType) type).getPrecision(),
utcTimezone);
case CHAR:
case VARCHAR:
return avroObject -> StringData.fromString(avroObject.toString());
@@ -141,12 +145,12 @@ public class AvroToRowDataConverters {
case DECIMAL:
return createDecimalConverter((DecimalType) type);
case ARRAY:
- return createArrayConverter((ArrayType) type);
+ return createArrayConverter((ArrayType) type, utcTimezone);
case ROW:
- return createRowConverter((RowType) type);
+ return createRowConverter((RowType) type, utcTimezone);
case MAP:
case MULTISET:
- return createMapConverter(type);
+ return createMapConverter(type, utcTimezone);
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
@@ -170,9 +174,9 @@ public class AvroToRowDataConverters {
};
}
- private static AvroToRowDataConverter createArrayConverter(ArrayType
arrayType) {
+ private static AvroToRowDataConverter createArrayConverter(ArrayType
arrayType, boolean utcTimezone) {
final AvroToRowDataConverter elementConverter =
- createNullableConverter(arrayType.getElementType());
+ createNullableConverter(arrayType.getElementType(), utcTimezone);
final Class<?> elementClass =
LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
@@ -187,11 +191,11 @@ public class AvroToRowDataConverters {
};
}
- private static AvroToRowDataConverter createMapConverter(LogicalType type) {
+ private static AvroToRowDataConverter createMapConverter(LogicalType type,
boolean utcTimezone) {
final AvroToRowDataConverter keyConverter =
- createConverter(DataTypes.STRING().getLogicalType());
+ createConverter(DataTypes.STRING().getLogicalType(), utcTimezone);
final AvroToRowDataConverter valueConverter =
-
createNullableConverter(AvroSchemaConverter.extractValueTypeToAvroMap(type));
+
createNullableConverter(AvroSchemaConverter.extractValueTypeToAvroMap(type),
utcTimezone);
return avroObject -> {
final Map<?, ?> map = (Map<?, ?>) avroObject;
@@ -205,7 +209,7 @@ public class AvroToRowDataConverters {
};
}
- private static AvroToRowDataConverter createTimestampConverter(int
precision) {
+ private static AvroToRowDataConverter createTimestampConverter(int
precision, boolean utcTimezone) {
final ChronoUnit chronoUnit;
if (precision <= 3) {
chronoUnit = ChronoUnit.MILLIS;
@@ -233,7 +237,11 @@ public class AvroToRowDataConverters {
"Unexpected object type for TIMESTAMP logical type. Received: "
+ avroObject);
}
}
- return TimestampData.fromInstant(instant);
+ if (utcTimezone) {
+ return TimestampData.fromInstant(instant);
+ } else {
+ return TimestampData.fromTimestamp(Timestamp.from(instant)); // this
applies the local timezone
+ }
};
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index ac38d92a577..bf050fe399d 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -480,7 +480,7 @@ public class ITTestHoodieDataSource {
}
@ParameterizedTest
- @MethodSource("tableTypeAndPartitioningParams")
+ @MethodSource("tableTypeAndBooleanTrueFalseParams")
void testStreamReadFilterByPartition(HoodieTableType tableType, boolean
hiveStylePartitioning) throws Exception {
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.TABLE_NAME, "t1");
@@ -568,7 +568,7 @@ public class ITTestHoodieDataSource {
}
@ParameterizedTest
- @MethodSource("tableTypeAndPartitioningParams")
+ @MethodSource("tableTypeAndBooleanTrueFalseParams")
void testWriteAndReadWithProctimeSequence(HoodieTableType tableType, boolean
hiveStylePartitioning) {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
@@ -591,7 +591,7 @@ public class ITTestHoodieDataSource {
}
@ParameterizedTest
- @MethodSource("tableTypeAndPartitioningParams")
+ @MethodSource("tableTypeAndBooleanTrueFalseParams")
void
testWriteAndReadWithProctimeSequenceWithTsColumnExisting(HoodieTableType
tableType, boolean hiveStylePartitioning) {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
@@ -641,7 +641,7 @@ public class ITTestHoodieDataSource {
}
@ParameterizedTest
- @MethodSource("tableTypeAndPartitioningParams")
+ @MethodSource("tableTypeAndBooleanTrueFalseParams")
void testBatchModeUpsert(HoodieTableType tableType, boolean
hiveStylePartitioning) {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
@@ -1836,8 +1836,8 @@ public class ITTestHoodieDataSource {
}
@ParameterizedTest
- @EnumSource(value = HoodieTableType.class)
- void testWriteReadWithTimestampWithoutTZ(HoodieTableType tableType) {
+ @MethodSource("tableTypeAndBooleanTrueFalseParams")
+ void testWriteReadWithTimestampWithoutTZ(HoodieTableType tableType, boolean
readUtcTimezone) {
TableEnvironment tableEnv = batchTableEnv;
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles"));
String createTable = sql("t1")
@@ -1849,8 +1849,7 @@ public class ITTestHoodieDataSource {
.option(FlinkOptions.PRECOMBINE_FIELD, "f1")
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.WRITE_UTC_TIMEZONE, false)
- //FlinkOptions.READ_UTC_TIMEZONE doesn't affect in
MergeOnReadInputFormat since the option isn't supported in
AvroToRowDataConverters
- //.option(FlinkOptions.READ_UTC_TIMEZONE, false)
+ .option(FlinkOptions.READ_UTC_TIMEZONE, readUtcTimezone)
.pkField("f0")
.noPartition()
.end();
@@ -1872,15 +1871,17 @@ public class ITTestHoodieDataSource {
List<Row> result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss");
+
+ final ZoneId expectedZoneId = readUtcTimezone ? ZoneId.of("UTC") :
ZoneId.systemDefault();
final String expected = "["
+ "+I[1"
+ ", abc"
- + ", " +
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis +
1000), ZoneId.of("UTC")))
- + ", " +
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis +
2000), ZoneId.of("UTC"))) + "], "
+ + ", " +
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis +
1000), expectedZoneId))
+ + ", " +
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis +
2000), expectedZoneId)) + "], "
+ "+I[2"
+ ", def"
- + ", " +
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis +
3000), ZoneId.of("UTC")))
- + ", " +
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis +
4000), ZoneId.of("UTC"))) + "]]";
+ + ", " +
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis +
3000), expectedZoneId))
+ + ", " +
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis +
4000), expectedZoneId)) + "]]";
assertRowsEquals(result, expected);
}
@@ -2015,7 +2016,7 @@ public class ITTestHoodieDataSource {
}
@ParameterizedTest
- @MethodSource("tableTypeAndPartitioningParams")
+ @MethodSource("tableTypeAndBooleanTrueFalseParams")
void testDynamicPartitionPrune(HoodieTableType tableType, boolean
hiveStylePartitioning) throws Exception {
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.TABLE_NAME, "t1");
@@ -2147,9 +2148,9 @@ public class ITTestHoodieDataSource {
}
/**
- * Return test params => (HoodieTableType, hive style partitioning).
+ * Return test params => (HoodieTableType, true/false).
*/
- private static Stream<Arguments> tableTypeAndPartitioningParams() {
+ private static Stream<Arguments> tableTypeAndBooleanTrueFalseParams() {
Object[][] data =
new Object[][] {
{HoodieTableType.COPY_ON_WRITE, false},