This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b2da9f  [HUDI-2631] In CompactFunction, set up the write schema each 
time with the latest schema (#4000)
3b2da9f is described below

commit 3b2da9f13847475be3dcef13b3d25df8818cecc7
Author: yuzhaojing <[email protected]>
AuthorDate: Wed Mar 2 11:18:17 2022 +0800

    [HUDI-2631] In CompactFunction, set up the write schema each time with the 
latest schema (#4000)
    
    Co-authored-by: yuzhaojing <[email protected]>
---
 .../apache/hudi/sink/compact/CompactFunction.java  | 23 ++++++++++++++++------
 .../java/org/apache/hudi/util/CompactionUtil.java  | 13 ++++++++++++
 2 files changed, 30 insertions(+), 6 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
index 560b5ff..a43fcd5 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
@@ -21,9 +21,11 @@ package org.apache.hudi.sink.compact;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
 import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
 import 
org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
+import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
@@ -51,7 +53,7 @@ public class CompactFunction extends 
ProcessFunction<CompactionPlanEvent, Compac
   /**
    * Write Client.
    */
-  private transient HoodieFlinkWriteClient writeClient;
+  private transient HoodieFlinkWriteClient<?> writeClient;
 
   /**
    * Whether to execute compaction asynchronously.
@@ -89,21 +91,24 @@ public class CompactFunction extends 
ProcessFunction<CompactionPlanEvent, Compac
     if (asyncCompaction) {
       // executes the compaction task asynchronously to not block the 
checkpoint barrier propagate.
       executor.execute(
-          () -> doCompaction(instantTime, compactionOperation, collector),
+          () -> doCompaction(instantTime, compactionOperation, collector, 
reloadWriteConfig()),
           (errMsg, t) -> collector.collect(new 
CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
           "Execute compaction for instant %s from task %d", instantTime, 
taskID);
     } else {
       // executes the compaction task synchronously for batch mode.
       LOG.info("Execute compaction for instant {} from task {}", instantTime, 
taskID);
-      doCompaction(instantTime, compactionOperation, collector);
+      doCompaction(instantTime, compactionOperation, collector, 
writeClient.getConfig());
     }
   }
 
-  private void doCompaction(String instantTime, CompactionOperation 
compactionOperation, Collector<CompactionCommitEvent> collector) throws 
IOException {
-    HoodieFlinkMergeOnReadTableCompactor compactor = new 
HoodieFlinkMergeOnReadTableCompactor();
+  private void doCompaction(String instantTime,
+                            CompactionOperation compactionOperation,
+                            Collector<CompactionCommitEvent> collector,
+                            HoodieWriteConfig writeConfig) throws IOException {
+    HoodieFlinkMergeOnReadTableCompactor<?> compactor = new 
HoodieFlinkMergeOnReadTableCompactor<>();
     List<WriteStatus> writeStatuses = compactor.compact(
         new HoodieFlinkCopyOnWriteTable<>(
-            writeClient.getConfig(),
+            writeConfig,
             writeClient.getEngineContext(),
             writeClient.getHoodieTable().getMetaClient()),
         writeClient.getHoodieTable().getMetaClient(),
@@ -114,6 +119,12 @@ public class CompactFunction extends 
ProcessFunction<CompactionPlanEvent, Compac
     collector.collect(new CompactionCommitEvent(instantTime, 
compactionOperation.getFileId(), writeStatuses, taskID));
   }
 
+  private HoodieWriteConfig reloadWriteConfig() throws Exception {
+    HoodieWriteConfig writeConfig = writeClient.getConfig();
+    CompactionUtil.setAvroSchema(writeConfig, 
writeClient.getHoodieTable().getMetaClient());
+    return writeConfig;
+  }
+
   @VisibleForTesting
   public void setExecutor(NonThrownExecutor executor) {
     this.executor = executor;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index d04937b..74629f9 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -26,6 +26,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.sink.compact.FlinkCompactionConfig;
@@ -107,6 +108,18 @@ public class CompactionUtil {
   }
 
   /**
+   * Sets up the avro schema string into the HoodieWriteConfig {@code 
HoodieWriteConfig}
+   * through reading from the hoodie table metadata.
+   *
+   * @param writeConfig The HoodieWriteConfig
+   */
+  public static void setAvroSchema(HoodieWriteConfig writeConfig, 
HoodieTableMetaClient metaClient) throws Exception {
+    TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+    Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false);
+    writeConfig.setSchema(tableAvroSchema.toString());
+  }
+
+  /**
    * Infers the changelog mode based on the data file schema(including 
metadata fields).
    *
    * <p>We can improve the code if the changelog mode is set up as table 
config.

Reply via email to