This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f4b2782886 [HUDI-4572] Fix 'Not a valid schema field: ts' error in
HoodieFlinkCompactor if precombine field is not ts (#6331)
f4b2782886 is described below
commit f4b2782886b9f01f7b1cce1ef2c06752ff1573e4
Author: 冯健 <[email protected]>
AuthorDate: Tue Aug 9 10:10:30 2022 +0800
[HUDI-4572] Fix 'Not a valid schema field: ts' error in
HoodieFlinkCompactor if precombine field is not ts (#6331)
Co-authored-by: jian.feng <[email protected]>
---
.../org/apache/hudi/sink/compact/HoodieFlinkCompactor.java | 2 ++
.../src/main/java/org/apache/hudi/util/CompactionUtil.java | 14 ++++++++++++++
2 files changed, 16 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index e2d2972a0d..92c73e2b9f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -173,6 +173,8 @@ public class HoodieFlinkCompactor {
// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);
+ CompactionUtil.setPreCombineField(conf, metaClient);
+
// infer changelog mode
CompactionUtil.inferChangelogMode(conf, metaClient);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index 3d386cf8cc..b0bac433b6 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -119,6 +119,20 @@ public class CompactionUtil {
writeConfig.setSchema(tableAvroSchema.toString());
}
+ /**
+ * Sets up the preCombine field into the given configuration {@code conf}
+ * through reading from the hoodie table metadata.
+ *
+ * This value is non-null as compaction can only be performed on MOR tables.
+ * Of which, MOR tables will have non-null precombine fields.
+ *
+ * @param conf The configuration
+ */
+ public static void setPreCombineField(Configuration conf,
HoodieTableMetaClient metaClient) {
+ String preCombineField = metaClient.getTableConfig().getPreCombineField();
+ conf.setString(FlinkOptions.PRECOMBINE_FIELD, preCombineField);
+ }
+
/**
* Infers the changelog mode based on the data file schema(including
metadata fields).
*