This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 60576bcccf08 [HUDI-9574] Avoid creating InternalSchemaManager for each 
file in FileGroup reader based Flink Compaction (#13535)
60576bcccf08 is described below

commit 60576bcccf08d07abb9d1ee41cf52656fc491bbf
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Jul 11 09:46:49 2025 +0800

    [HUDI-9574] Avoid creating InternalSchemaManager for each file in FileGroup 
reader based Flink Compaction (#13535)
---
 .../hudi/sink/bootstrap/BootstrapOperator.java     |  2 +-
 .../apache/hudi/sink/compact/CompactOperator.java  | 70 +++++++++++++++-------
 .../org/apache/hudi/table/HoodieTableSource.java   |  2 +-
 .../table/format/FlinkReaderContextFactory.java    | 16 +++--
 .../hudi/table/format/InternalSchemaManager.java   | 28 ++++-----
 5 files changed, 73 insertions(+), 45 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index a44ce02272d7..9fe19d20741b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -133,7 +133,7 @@ public class BootstrapOperator
     this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, 
getRuntimeContext());
     this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
     this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
-    this.internalSchemaManager = InternalSchemaManager.get(conf, metaClient);
+    this.internalSchemaManager = 
InternalSchemaManager.get(hoodieTable.getStorageConf(), metaClient);
 
     preLoadIndexRecords();
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
index 7956fa150817..db39f5fda448 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
@@ -101,14 +101,24 @@ public class CompactOperator extends 
TableStreamOperator<CompactionCommitEvent>
   private transient StreamRecordCollector<CompactionCommitEvent> collector;
 
   /**
-   * Reader context.
+   * Compaction metrics.
    */
-  private transient Option<HoodieReaderContext<?>> readerContextOpt;
+  private transient FlinkCompactionMetrics compactionMetrics;
 
   /**
-   * Compaction metrics.
+   * Previous compact instant time.
    */
-  private transient FlinkCompactionMetrics compactionMetrics;
+  private transient String prevCompactInstant = "";
+
+  /**
+   * Whether FileGroup reader based compaction should be used;
+   */
+  private transient boolean useFileGroupReaderBasedCompaction;
+
+  /**
+   * InternalSchema manager used for handling schema evolution.
+   */
+  private transient InternalSchemaManager internalSchemaManager;
 
   public CompactOperator(Configuration conf) {
     this.conf = conf;
@@ -125,11 +135,16 @@ public class CompactOperator extends 
TableStreamOperator<CompactionCommitEvent>
     this.taskID = getRuntimeContext().getIndexOfThisSubtask();
     this.writeClient = FlinkWriteClients.createWriteClient(conf, 
getRuntimeContext());
     this.flinkTable = this.writeClient.getHoodieTable();
-    this.readerContextOpt = initReaderContext(this.writeClient);
     if (this.asyncCompaction) {
       this.executor = NonThrownExecutor.builder(LOG).build();
     }
     this.collector = new StreamRecordCollector<>(output);
+    HoodieTableMetaClient metaClient = flinkTable.getMetaClient();
+    this.useFileGroupReaderBasedCompaction =
+        !metaClient.isMetadataTable()
+            && 
writeClient.getConfig().getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
+            && writeClient.getConfig().populateMetaFields()                    
                                     // Virtual key support by fg reader is not 
ready
+            && !(metaClient.getTableConfig().isCDCEnabled() && 
writeClient.getConfig().isYieldingPureLogForMor());  // do not support produce 
cdc log during fg reader
     registerMetrics();
   }
 
@@ -138,28 +153,33 @@ public class CompactOperator extends 
TableStreamOperator<CompactionCommitEvent>
     final CompactionPlanEvent event = record.getValue();
     final String instantTime = event.getCompactionInstantTime();
     final CompactionOperation compactionOperation = event.getOperation();
+    boolean needReloadMetaClient = !instantTime.equals(prevCompactInstant);
+    prevCompactInstant = instantTime;
     if (asyncCompaction) {
       // executes the compaction task asynchronously to not block the 
checkpoint barrier propagate.
       executor.execute(
-          () -> doCompaction(instantTime, compactionOperation, collector, 
writeClient.getConfig()),
+          () -> doCompaction(instantTime, compactionOperation, collector, 
writeClient.getConfig(), needReloadMetaClient),
           (errMsg, t) -> collector.collect(new 
CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
           "Execute compaction for instant %s from task %d", instantTime, 
taskID);
     } else {
       // executes the compaction task synchronously for batch mode.
       LOG.info("Execute compaction for instant {} from task {}", instantTime, 
taskID);
-      doCompaction(instantTime, compactionOperation, collector, 
writeClient.getConfig());
+      doCompaction(instantTime, compactionOperation, collector, 
writeClient.getConfig(), needReloadMetaClient);
     }
   }
 
   private void doCompaction(String instantTime,
                             CompactionOperation compactionOperation,
                             Collector<CompactionCommitEvent> collector,
-                            HoodieWriteConfig writeConfig) throws Exception {
+                            HoodieWriteConfig writeConfig,
+                            boolean needReloadMetaClient) throws Exception {
     compactionMetrics.startCompaction();
     HoodieFlinkMergeOnReadTableCompactor<?> compactor = new 
HoodieFlinkMergeOnReadTableCompactor<>();
     HoodieTableMetaClient metaClient = flinkTable.getMetaClient();
-    // reload the timeline
-    metaClient.reload();
+    if (needReloadMetaClient) {
+      // reload the timeline
+      metaClient.reload();
+    }
     // schema evolution
     CompactionUtil.setAvroSchema(writeConfig, metaClient);
     List<WriteStatus> writeStatuses = compactor.compact(
@@ -168,28 +188,32 @@ public class CompactOperator extends 
TableStreamOperator<CompactionCommitEvent>
         compactionOperation,
         instantTime,
         flinkTable.getTaskContextSupplier(),
-        readerContextOpt,
+        createReaderContext(writeClient, needReloadMetaClient),
         flinkTable);
     compactionMetrics.endCompaction();
     collector.collect(new CompactionCommitEvent(instantTime, 
compactionOperation.getFileId(), writeStatuses, taskID));
   }
 
-  private Option<HoodieReaderContext<?>> 
initReaderContext(HoodieFlinkWriteClient<?> writeClient) {
-    HoodieTableMetaClient metaClient = flinkTable.getMetaClient();
-    boolean useFileGroupReaderBasedCompaction = !metaClient.isMetadataTable()
-        && 
writeClient.getConfig().getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
-        && writeClient.getConfig().populateMetaFields()                        
                                 // Virtual key support by fg reader is not 
ready
-        && !(metaClient.getTableConfig().isCDCEnabled() && 
writeClient.getConfig().isYieldingPureLogForMor());  // do not support produce 
cdc log during fg reader
+  private Option<HoodieReaderContext<?>> 
createReaderContext(HoodieFlinkWriteClient<?> writeClient, boolean 
needReloadMetaClient) {
     if (useFileGroupReaderBasedCompaction) {
-      // CAUTION: reuse the meta client so that the timeline is updated
-      Supplier<InternalSchemaManager> internalSchemaManager = () -> 
InternalSchemaManager.get(conf, metaClient);
+      HoodieTableMetaClient metaClient = flinkTable.getMetaClient();
+      // CAUTION: InternalSchemaManager will scan timeline, reusing the meta 
client so that the timeline is updated.
+      // Instantiate internalSchemaManager lazily here since it may not be 
needed for FG reader, e.g., schema evolution
+      // for log files in FG reader do not use internalSchemaManager.
+      Supplier<InternalSchemaManager> internalSchemaManagerSupplier = () -> {
+        if (internalSchemaManager == null || needReloadMetaClient) {
+          internalSchemaManager = 
InternalSchemaManager.get(metaClient.getStorageConf(), metaClient);
+        }
+        return internalSchemaManager;
+      };
+
       // initialize storage conf lazily.
       StorageConfiguration<?> readerConf = 
writeClient.getEngineContext().getStorageConf();
-      return Option.of(new FlinkRowDataReaderContext(readerConf, 
internalSchemaManager, Collections.emptyList(), metaClient.getTableConfig(), 
Option.empty()));
-    } else {
-      // always using avro record merger for legacy compaction since log 
scanner do not support rowdata reading yet.
-      
writeClient.getConfig().setRecordMergerClass(HoodieAvroRecordMerger.class.getName());
+      return Option.of(new FlinkRowDataReaderContext(
+          readerConf, internalSchemaManagerSupplier, Collections.emptyList(), 
metaClient.getTableConfig(), Option.empty()));
     }
+    // always using avro record merger for legacy compaction since log scanner 
do not support rowdata reading yet.
+    
writeClient.getConfig().setRecordMergerClass(HoodieAvroRecordMerger.class.getName());
     return Option.empty();
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index a764d8f175cd..8c2c27a23412 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -199,7 +199,7 @@ public class HoodieTableSource implements
     this.hadoopConf = new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));
     this.metaClient = Option.ofNullable(metaClient).orElseGet(() -> 
StreamerUtil.metaClientForReader(conf, this.hadoopConf.unwrap()));
     this.maxCompactionMemoryInBytes = 
StreamerUtil.getMaxCompactionMemoryInBytes(conf);
-    this.internalSchemaManager = 
Option.ofNullable(internalSchemaManager).orElseGet(() -> 
InternalSchemaManager.get(this.conf, this.metaClient));
+    this.internalSchemaManager = 
Option.ofNullable(internalSchemaManager).orElseGet(() -> 
InternalSchemaManager.get(this.hadoopConf, this.metaClient));
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkReaderContextFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkReaderContextFactory.java
index fe66ee66cb57..b4a720675cc5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkReaderContextFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkReaderContextFactory.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.engine.ReaderContextFactory;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
 
 import java.util.Collections;
@@ -35,6 +34,7 @@ import java.util.function.Supplier;
  */
 public class FlinkReaderContextFactory implements 
ReaderContextFactory<RowData> {
   private final HoodieTableMetaClient metaClient;
+  private InternalSchemaManager internalSchemaManager;
 
   public FlinkReaderContextFactory(HoodieTableMetaClient metaClient) {
     this.metaClient = metaClient;
@@ -42,9 +42,15 @@ public class FlinkReaderContextFactory implements 
ReaderContextFactory<RowData>
 
   @Override
   public HoodieReaderContext<RowData> getContext() {
-    Supplier<InternalSchemaManager> internalSchemaManager = () -> 
InternalSchemaManager.get(metaClient.getStorageConf().unwrapAs(Configuration.class),
 metaClient);
-
-    return new FlinkRowDataReaderContext(metaClient.getStorageConf(), 
internalSchemaManager,
-        Collections.emptyList(), metaClient.getTableConfig(), Option.empty());
+    Supplier<InternalSchemaManager> internalSchemaManagerSupplier = () -> {
+      // CAUTION: instantiate internalSchemaManager lazily here since it may 
not be needed for FG reader,
+      // e.g., schema evolution for log files in FG reader do not use 
internalSchemaManager.
+      if (internalSchemaManager == null) {
+        internalSchemaManager = 
InternalSchemaManager.get(metaClient.getStorageConf(), metaClient);
+      }
+      return internalSchemaManager;
+    };
+    return new FlinkRowDataReaderContext(
+        metaClient.getStorageConf(), internalSchemaManagerSupplier, 
Collections.emptyList(), metaClient.getTableConfig(), Option.empty());
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
index 35d0b49cf1a7..6ee6a8d81d53 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.format;
 
+import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -28,18 +29,16 @@ import 
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.util.InternalSchemaCache;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.configuration.HadoopConfigurations;
-import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.Type;
 import org.apache.hudi.internal.schema.Types;
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
 import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 import org.apache.hudi.util.AvroSchemaConverter;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
 
@@ -64,16 +63,15 @@ public class InternalSchemaManager implements Serializable {
   public static final InternalSchemaManager DISABLED = new 
InternalSchemaManager(null, InternalSchema.getEmptyInternalSchema(), null, null,
       TimelineLayout.fromVersion(TimelineLayoutVersion.CURR_LAYOUT_VERSION), 
null);
 
-  private final Configuration conf;
   private final InternalSchema querySchema;
   private final String validCommits;
   private final String tablePath;
   private final TimelineLayout layout;
   private final HoodieTableConfig tableConfig;
-  private transient org.apache.hadoop.conf.Configuration hadoopConf;
+  private final StorageConfiguration<?> storageConf;
 
-  public static InternalSchemaManager get(Configuration conf, 
HoodieTableMetaClient metaClient) {
-    if (!OptionsResolver.isSchemaEvolutionEnabled(conf)) {
+  public static InternalSchemaManager get(StorageConfiguration<?> conf, 
HoodieTableMetaClient metaClient) {
+    if (!isSchemaEvolutionEnabled(conf)) {
       return DISABLED;
     }
     Option<InternalSchema> internalSchema = new 
TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata();
@@ -91,9 +89,9 @@ public class InternalSchemaManager implements Serializable {
     return new InternalSchemaManager(conf, internalSchema.get(), validCommits, 
metaClient.getBasePath().toString(), metaClient.getTimelineLayout(), 
metaClient.getTableConfig());
   }
 
-  public InternalSchemaManager(Configuration conf, InternalSchema querySchema, 
String validCommits, String tablePath,
+  public InternalSchemaManager(StorageConfiguration<?> storageConf, 
InternalSchema querySchema, String validCommits, String tablePath,
                                TimelineLayout layout, HoodieTableConfig 
tableConfig) {
-    this.conf = conf;
+    this.storageConf = storageConf;
     this.querySchema = querySchema;
     this.validCommits = validCommits;
     this.tablePath = tablePath;
@@ -123,7 +121,7 @@ public class InternalSchemaManager implements Serializable {
     long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(fileName));
     InternalSchema fileSchema = 
InternalSchemaCache.getInternalSchemaByVersionId(
         commitInstantTime, tablePath,
-        new HoodieHadoopStorage(tablePath, getHadoopConf()),
+        new HoodieHadoopStorage(tablePath, storageConf),
         validCommits, layout, tableConfig);
     if (querySchema.equals(fileSchema)) {
       return InternalSchema.getEmptyInternalSchema();
@@ -228,10 +226,10 @@ public class InternalSchemaManager implements 
Serializable {
     return Collections.unmodifiableMap(posProxy);
   }
 
-  private org.apache.hadoop.conf.Configuration getHadoopConf() {
-    if (hadoopConf == null) {
-      hadoopConf = HadoopConfigurations.getHadoopConf(conf);
-    }
-    return hadoopConf;
+  /**
+   * Returns whether comprehensive schema evolution enabled.
+   */
+  private static boolean isSchemaEvolutionEnabled(StorageConfiguration<?> 
conf) {
+    return conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), 
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue());
   }
 }

Reply via email to