This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch release-0.12.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 528aaf7bc6a2307b17b3e178c7bbddc2d10fcb82 Author: 冯健 <[email protected]> AuthorDate: Tue Aug 9 10:10:30 2022 +0800 [HUDI-4572] Fix 'Not a valid schema field: ts' error in HoodieFlinkCompactor if precombine field is not ts (#6331) Co-authored-by: jian.feng <[email protected]> --- .../org/apache/hudi/sink/compact/HoodieFlinkCompactor.java | 2 ++ .../src/main/java/org/apache/hudi/util/CompactionUtil.java | 14 ++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index e2d2972a0d..92c73e2b9f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -173,6 +173,8 @@ public class HoodieFlinkCompactor { // set table schema CompactionUtil.setAvroSchema(conf, metaClient); + CompactionUtil.setPreCombineField(conf, metaClient); + // infer changelog mode CompactionUtil.inferChangelogMode(conf, metaClient); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index 3d386cf8cc..b0bac433b6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -119,6 +119,20 @@ public class CompactionUtil { writeConfig.setSchema(tableAvroSchema.toString()); } + /** + * Sets up the preCombine field into the given configuration {@code conf} + * through reading from the hoodie table metadata. + * + * This value is non-null as compaction can only be performed on MOR tables. + * Of which, MOR tables will have non-null precombine fields. + * + * @param conf The configuration + */ + public static void setPreCombineField(Configuration conf, HoodieTableMetaClient metaClient) { + String preCombineField = metaClient.getTableConfig().getPreCombineField(); + conf.setString(FlinkOptions.PRECOMBINE_FIELD, preCombineField); + } + /** * Infers the changelog mode based on the data file schema(including metadata fields). *
