This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 60576bcccf08 [HUDI-9574] Avoid creating InternalSchemaManager for each
file in FileGroup reader based Flink Compaction (#13535)
60576bcccf08 is described below
commit 60576bcccf08d07abb9d1ee41cf52656fc491bbf
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Jul 11 09:46:49 2025 +0800
[HUDI-9574] Avoid creating InternalSchemaManager for each file in FileGroup
reader based Flink Compaction (#13535)
---
.../hudi/sink/bootstrap/BootstrapOperator.java | 2 +-
.../apache/hudi/sink/compact/CompactOperator.java | 70 +++++++++++++++-------
.../org/apache/hudi/table/HoodieTableSource.java | 2 +-
.../table/format/FlinkReaderContextFactory.java | 16 +++--
.../hudi/table/format/InternalSchemaManager.java | 28 ++++-----
5 files changed, 73 insertions(+), 45 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index a44ce02272d7..9fe19d20741b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -133,7 +133,7 @@ public class BootstrapOperator
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf,
getRuntimeContext());
this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
- this.internalSchemaManager = InternalSchemaManager.get(conf, metaClient);
+ this.internalSchemaManager =
InternalSchemaManager.get(hoodieTable.getStorageConf(), metaClient);
preLoadIndexRecords();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
index 7956fa150817..db39f5fda448 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
@@ -101,14 +101,24 @@ public class CompactOperator extends
TableStreamOperator<CompactionCommitEvent>
private transient StreamRecordCollector<CompactionCommitEvent> collector;
/**
- * Reader context.
+ * Compaction metrics.
*/
- private transient Option<HoodieReaderContext<?>> readerContextOpt;
+ private transient FlinkCompactionMetrics compactionMetrics;
/**
- * Compaction metrics.
+ * Previous compact instant time.
*/
- private transient FlinkCompactionMetrics compactionMetrics;
+ private transient String prevCompactInstant = "";
+
+ /**
+ * Whether FileGroup reader based compaction should be used;
+ */
+ private transient boolean useFileGroupReaderBasedCompaction;
+
+ /**
+ * InternalSchema manager used for handling schema evolution.
+ */
+ private transient InternalSchemaManager internalSchemaManager;
public CompactOperator(Configuration conf) {
this.conf = conf;
@@ -125,11 +135,16 @@ public class CompactOperator extends
TableStreamOperator<CompactionCommitEvent>
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = FlinkWriteClients.createWriteClient(conf,
getRuntimeContext());
this.flinkTable = this.writeClient.getHoodieTable();
- this.readerContextOpt = initReaderContext(this.writeClient);
if (this.asyncCompaction) {
this.executor = NonThrownExecutor.builder(LOG).build();
}
this.collector = new StreamRecordCollector<>(output);
+ HoodieTableMetaClient metaClient = flinkTable.getMetaClient();
+ this.useFileGroupReaderBasedCompaction =
+ !metaClient.isMetadataTable()
+ &&
writeClient.getConfig().getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
+ && writeClient.getConfig().populateMetaFields()
// Virtual key support by fg reader is not
ready
+ && !(metaClient.getTableConfig().isCDCEnabled() &&
writeClient.getConfig().isYieldingPureLogForMor()); // do not support produce
cdc log during fg reader
registerMetrics();
}
@@ -138,28 +153,33 @@ public class CompactOperator extends
TableStreamOperator<CompactionCommitEvent>
final CompactionPlanEvent event = record.getValue();
final String instantTime = event.getCompactionInstantTime();
final CompactionOperation compactionOperation = event.getOperation();
+ boolean needReloadMetaClient = !instantTime.equals(prevCompactInstant);
+ prevCompactInstant = instantTime;
if (asyncCompaction) {
// executes the compaction task asynchronously to not block the
checkpoint barrier propagate.
executor.execute(
- () -> doCompaction(instantTime, compactionOperation, collector,
writeClient.getConfig()),
+ () -> doCompaction(instantTime, compactionOperation, collector,
writeClient.getConfig(), needReloadMetaClient),
(errMsg, t) -> collector.collect(new
CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
"Execute compaction for instant %s from task %d", instantTime,
taskID);
} else {
// executes the compaction task synchronously for batch mode.
LOG.info("Execute compaction for instant {} from task {}", instantTime,
taskID);
- doCompaction(instantTime, compactionOperation, collector,
writeClient.getConfig());
+ doCompaction(instantTime, compactionOperation, collector,
writeClient.getConfig(), needReloadMetaClient);
}
}
private void doCompaction(String instantTime,
CompactionOperation compactionOperation,
Collector<CompactionCommitEvent> collector,
- HoodieWriteConfig writeConfig) throws Exception {
+ HoodieWriteConfig writeConfig,
+ boolean needReloadMetaClient) throws Exception {
compactionMetrics.startCompaction();
HoodieFlinkMergeOnReadTableCompactor<?> compactor = new
HoodieFlinkMergeOnReadTableCompactor<>();
HoodieTableMetaClient metaClient = flinkTable.getMetaClient();
- // reload the timeline
- metaClient.reload();
+ if (needReloadMetaClient) {
+ // reload the timeline
+ metaClient.reload();
+ }
// schema evolution
CompactionUtil.setAvroSchema(writeConfig, metaClient);
List<WriteStatus> writeStatuses = compactor.compact(
@@ -168,28 +188,32 @@ public class CompactOperator extends
TableStreamOperator<CompactionCommitEvent>
compactionOperation,
instantTime,
flinkTable.getTaskContextSupplier(),
- readerContextOpt,
+ createReaderContext(writeClient, needReloadMetaClient),
flinkTable);
compactionMetrics.endCompaction();
collector.collect(new CompactionCommitEvent(instantTime,
compactionOperation.getFileId(), writeStatuses, taskID));
}
- private Option<HoodieReaderContext<?>>
initReaderContext(HoodieFlinkWriteClient<?> writeClient) {
- HoodieTableMetaClient metaClient = flinkTable.getMetaClient();
- boolean useFileGroupReaderBasedCompaction = !metaClient.isMetadataTable()
- &&
writeClient.getConfig().getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
- && writeClient.getConfig().populateMetaFields()
// Virtual key support by fg reader is not
ready
- && !(metaClient.getTableConfig().isCDCEnabled() &&
writeClient.getConfig().isYieldingPureLogForMor()); // do not support produce
cdc log during fg reader
+ private Option<HoodieReaderContext<?>>
createReaderContext(HoodieFlinkWriteClient<?> writeClient, boolean
needReloadMetaClient) {
if (useFileGroupReaderBasedCompaction) {
- // CAUTION: reuse the meta client so that the timeline is updated
- Supplier<InternalSchemaManager> internalSchemaManager = () ->
InternalSchemaManager.get(conf, metaClient);
+ HoodieTableMetaClient metaClient = flinkTable.getMetaClient();
+ // CAUTION: InternalSchemaManager will scan timeline, reusing the meta
client so that the timeline is updated.
+ // Instantiate internalSchemaManager lazily here since it may not be
needed for FG reader, e.g., schema evolution
+ // for log files in FG reader do not use internalSchemaManager.
+ Supplier<InternalSchemaManager> internalSchemaManagerSupplier = () -> {
+ if (internalSchemaManager == null || needReloadMetaClient) {
+ internalSchemaManager =
InternalSchemaManager.get(metaClient.getStorageConf(), metaClient);
+ }
+ return internalSchemaManager;
+ };
+
// initialize storage conf lazily.
StorageConfiguration<?> readerConf =
writeClient.getEngineContext().getStorageConf();
- return Option.of(new FlinkRowDataReaderContext(readerConf,
internalSchemaManager, Collections.emptyList(), metaClient.getTableConfig(),
Option.empty()));
- } else {
- // always using avro record merger for legacy compaction since log
scanner do not support rowdata reading yet.
-
writeClient.getConfig().setRecordMergerClass(HoodieAvroRecordMerger.class.getName());
+ return Option.of(new FlinkRowDataReaderContext(
+ readerConf, internalSchemaManagerSupplier, Collections.emptyList(),
metaClient.getTableConfig(), Option.empty()));
}
+ // always using avro record merger for legacy compaction since log scanner
do not support rowdata reading yet.
+
writeClient.getConfig().setRecordMergerClass(HoodieAvroRecordMerger.class.getName());
return Option.empty();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index a764d8f175cd..8c2c27a23412 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -199,7 +199,7 @@ public class HoodieTableSource implements
this.hadoopConf = new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));
this.metaClient = Option.ofNullable(metaClient).orElseGet(() ->
StreamerUtil.metaClientForReader(conf, this.hadoopConf.unwrap()));
this.maxCompactionMemoryInBytes =
StreamerUtil.getMaxCompactionMemoryInBytes(conf);
- this.internalSchemaManager =
Option.ofNullable(internalSchemaManager).orElseGet(() ->
InternalSchemaManager.get(this.conf, this.metaClient));
+ this.internalSchemaManager =
Option.ofNullable(internalSchemaManager).orElseGet(() ->
InternalSchemaManager.get(this.hadoopConf, this.metaClient));
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkReaderContextFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkReaderContextFactory.java
index fe66ee66cb57..b4a720675cc5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkReaderContextFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkReaderContextFactory.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import java.util.Collections;
@@ -35,6 +34,7 @@ import java.util.function.Supplier;
*/
public class FlinkReaderContextFactory implements
ReaderContextFactory<RowData> {
private final HoodieTableMetaClient metaClient;
+ private InternalSchemaManager internalSchemaManager;
public FlinkReaderContextFactory(HoodieTableMetaClient metaClient) {
this.metaClient = metaClient;
@@ -42,9 +42,15 @@ public class FlinkReaderContextFactory implements
ReaderContextFactory<RowData>
@Override
public HoodieReaderContext<RowData> getContext() {
- Supplier<InternalSchemaManager> internalSchemaManager = () ->
InternalSchemaManager.get(metaClient.getStorageConf().unwrapAs(Configuration.class),
metaClient);
-
- return new FlinkRowDataReaderContext(metaClient.getStorageConf(),
internalSchemaManager,
- Collections.emptyList(), metaClient.getTableConfig(), Option.empty());
+ Supplier<InternalSchemaManager> internalSchemaManagerSupplier = () -> {
+ // CAUTION: instantiate internalSchemaManager lazily here since it may
not be needed for FG reader,
+ // e.g., schema evolution for log files in FG reader do not use
internalSchemaManager.
+ if (internalSchemaManager == null) {
+ internalSchemaManager =
InternalSchemaManager.get(metaClient.getStorageConf(), metaClient);
+ }
+ return internalSchemaManager;
+ };
+ return new FlinkRowDataReaderContext(
+ metaClient.getStorageConf(), internalSchemaManagerSupplier,
Collections.emptyList(), metaClient.getTableConfig(), Option.empty());
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
index 35d0b49cf1a7..6ee6a8d81d53 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.format;
+import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -28,18 +29,16 @@ import
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.configuration.HadoopConfigurations;
-import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.Type;
import org.apache.hudi.internal.schema.Types;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.util.AvroSchemaConverter;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
@@ -64,16 +63,15 @@ public class InternalSchemaManager implements Serializable {
public static final InternalSchemaManager DISABLED = new
InternalSchemaManager(null, InternalSchema.getEmptyInternalSchema(), null, null,
TimelineLayout.fromVersion(TimelineLayoutVersion.CURR_LAYOUT_VERSION),
null);
- private final Configuration conf;
private final InternalSchema querySchema;
private final String validCommits;
private final String tablePath;
private final TimelineLayout layout;
private final HoodieTableConfig tableConfig;
- private transient org.apache.hadoop.conf.Configuration hadoopConf;
+ private final StorageConfiguration<?> storageConf;
- public static InternalSchemaManager get(Configuration conf,
HoodieTableMetaClient metaClient) {
- if (!OptionsResolver.isSchemaEvolutionEnabled(conf)) {
+ public static InternalSchemaManager get(StorageConfiguration<?> conf,
HoodieTableMetaClient metaClient) {
+ if (!isSchemaEvolutionEnabled(conf)) {
return DISABLED;
}
Option<InternalSchema> internalSchema = new
TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata();
@@ -91,9 +89,9 @@ public class InternalSchemaManager implements Serializable {
return new InternalSchemaManager(conf, internalSchema.get(), validCommits,
metaClient.getBasePath().toString(), metaClient.getTimelineLayout(),
metaClient.getTableConfig());
}
- public InternalSchemaManager(Configuration conf, InternalSchema querySchema,
String validCommits, String tablePath,
+ public InternalSchemaManager(StorageConfiguration<?> storageConf,
InternalSchema querySchema, String validCommits, String tablePath,
TimelineLayout layout, HoodieTableConfig
tableConfig) {
- this.conf = conf;
+ this.storageConf = storageConf;
this.querySchema = querySchema;
this.validCommits = validCommits;
this.tablePath = tablePath;
@@ -123,7 +121,7 @@ public class InternalSchemaManager implements Serializable {
long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(fileName));
InternalSchema fileSchema =
InternalSchemaCache.getInternalSchemaByVersionId(
commitInstantTime, tablePath,
- new HoodieHadoopStorage(tablePath, getHadoopConf()),
+ new HoodieHadoopStorage(tablePath, storageConf),
validCommits, layout, tableConfig);
if (querySchema.equals(fileSchema)) {
return InternalSchema.getEmptyInternalSchema();
@@ -228,10 +226,10 @@ public class InternalSchemaManager implements
Serializable {
return Collections.unmodifiableMap(posProxy);
}
- private org.apache.hadoop.conf.Configuration getHadoopConf() {
- if (hadoopConf == null) {
- hadoopConf = HadoopConfigurations.getHadoopConf(conf);
- }
- return hadoopConf;
+ /**
+ * Returns whether comprehensive schema evolution enabled.
+ */
+ private static boolean isSchemaEvolutionEnabled(StorageConfiguration<?>
conf) {
+ return conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(),
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue());
}
}