This is an automated email from the ASF dual-hosted git repository.
wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d87b8599ac4 [HUDI-8785] Missed set of `POPULATE_META_FIELDS` parameter
during table initialization by Flink (#12731)
d87b8599ac4 is described below
commit d87b8599ac4c4c9f1253339a5c0610ba5f801ff6
Author: Geser Dugarov <[email protected]>
AuthorDate: Fri Jan 31 09:45:49 2025 +0700
[HUDI-8785] Missed set of `POPULATE_META_FIELDS` parameter during table
initialization by Flink (#12731)
---
.../java/org/apache/hudi/configuration/OptionsResolver.java | 11 +++++++++++
.../src/main/java/org/apache/hudi/util/StreamerUtil.java | 1 +
.../java/org/apache/hudi/table/ITTestHoodieDataSource.java | 5 ++---
3 files changed, 14 insertions(+), 3 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 9d4e289cb20..59cb4eb3432 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -27,6 +27,7 @@ import
org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.StringUtils;
@@ -375,6 +376,16 @@ public class OptionsResolver {
return conf.getBoolean(FlinkOptions.READ_CDC_FROM_CHANGELOG);
}
+ /**
+ * Returns whether to populate meta fields or not
+ */
+ public static boolean isPopulateMetaFields(Configuration conf) {
+ return Boolean.parseBoolean(
+ conf.getString(
+ HoodieTableConfig.POPULATE_META_FIELDS.key(),
+ HoodieTableConfig.POPULATE_META_FIELDS.defaultValue().toString()));
+ }
+
/**
* Returns the index type.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 7f7d5f3855f..a68abd38629 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -270,6 +270,7 @@ public class StreamerUtil {
.setUrlEncodePartitioning(conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING))
.setCDCEnabled(conf.getBoolean(FlinkOptions.CDC_ENABLED))
.setCDCSupplementalLoggingMode(conf.getString(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE))
+ .setPopulateMetaFields(OptionsResolver.isPopulateMetaFields(conf))
.initTable(HadoopFSUtils.getStorageConfWithCopy(hadoopConf),
basePath);
LOG.info("Table initialized under base path {}", basePath);
} else {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 3b5c689554b..a2a31b492e9 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -2450,9 +2450,8 @@ public class ITTestHoodieDataSource {
private static Stream<Arguments> parametersForMetaColumnsSkip() {
Object[][] data =
new Object[][] {
- {HoodieTableType.COPY_ON_WRITE, WriteOperationType.INSERT}
- // add MOR upsert check after fixing of HUDI-8785
- // {HoodieTableType.MERGE_ON_READ, WriteOperationType.UPSERT}
+ {HoodieTableType.COPY_ON_WRITE, WriteOperationType.INSERT},
+ {HoodieTableType.MERGE_ON_READ, WriteOperationType.UPSERT}
};
return Stream.of(data).map(Arguments::of);
}