danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1194731645
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -694,17 +695,75 @@ private Long getTableChecksum() {
return getLong(TABLE_CHECKSUM);
}
- public List<String> getMetadataPartitionsInflight() {
- return StringUtils.split(
- getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT,
StringUtils.EMPTY_STRING),
- CONFIG_VALUES_DELIMITER
- );
+ public Set<String> getMetadataPartitionsInflight() {
+ return new HashSet<>(StringUtils.split(
+ getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT,
StringUtils.EMPTY_STRING),
+ CONFIG_VALUES_DELIMITER));
}
public Set<String> getMetadataPartitions() {
return new HashSet<>(
- StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS,
StringUtils.EMPTY_STRING),
- CONFIG_VALUES_DELIMITER));
+ StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS,
StringUtils.EMPTY_STRING),
+ CONFIG_VALUES_DELIMITER));
+ }
+
+ /**
+ * @returns true if metadata table has been created and is being used for
this dataset, else returns false.
+ */
+ public boolean isMetadataTableEnabled() {
+ return isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+ }
+
+ /**
+ * Checks if metadata table is enabled and the specified partition has been
initialized.
+ *
+ * @param partition The partition to check
+ * @returns true if the specific partition has been initialized, else
returns false.
+ */
+ public boolean isMetadataPartitionEnabled(MetadataPartitionType partition) {
+ return getMetadataPartitions().contains(partition.getPartitionPath());
+ }
+
+ /**
+ * Enables or disables the specified metadata table partition.
+ *
+ * @param partition The partition
+ * @param enabled If true, the partition is enabled, else disabled
+ */
+ public void setMetadataPartitionState(MetadataPartitionType partition,
boolean enabled) {
+
ValidationUtils.checkArgument(!partition.getPartitionPath().contains(CONFIG_VALUES_DELIMITER),
+ "Metadata Table partition path cannot contain a comma: " +
partition.getPartitionPath());
+ Set<String> partitions = getMetadataPartitions();
+ Set<String> partitionsInflight = getMetadataPartitionsInflight();
+ if (enabled) {
+ partitions.add(partition.getPartitionPath());
+ partitionsInflight.remove(partition.getPartitionPath());
+ } else if (partition.equals(MetadataPartitionType.FILES)) {
+ // file listing partition is required for all other partitions to work
+ // Disabling file partition will also disable all partitions
+ partitions.clear();
+ partitionsInflight.clear();
+ } else {
+ partitions.remove(partition.getPartitionPath());
+ partitionsInflight.remove(partition.getPartitionPath());
+ }
+ setValue(TABLE_METADATA_PARTITIONS,
partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
+ setValue(TABLE_METADATA_PARTITIONS_INFLIGHT,
partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
Review Comment:
Do we need to persist these options?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -337,15 +337,33 @@ protected void initMetadataTable(Option<String>
instantTime) {
*
* @param inFlightInstantTimestamp - The in-flight action responsible for
the metadata table initialization
*/
- private void initializeMetadataTable(Option<String>
inFlightInstantTimestamp) {
- if (config.isMetadataTableEnabled()) {
- HoodieTableMetadataWriter writer =
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(),
config,
- context, Option.empty(), inFlightInstantTimestamp);
- try {
- writer.close();
- } catch (Exception e) {
- throw new HoodieException("Failed to instantiate Metadata table ", e);
+ private void initializeMetadataTable(WriteOperationType operationType,
Option<String> inFlightInstantTimestamp) {
+ if (!config.isMetadataTableEnabled()) {
+ return;
+ }
+
+ try (HoodieTableMetadataWriter writer =
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(),
config,
+ context, Option.empty(), inFlightInstantTimestamp)) {
+ if (writer.isInitialized()) {
+ // Optimize the metadata table which involves compacton. cleaning,
etc. This should only be called from writers.
+ switch (operationType) {
+ case INSERT:
+ case INSERT_PREPPED:
+ case UPSERT:
+ case UPSERT_PREPPED:
+ case BULK_INSERT:
+ case BULK_INSERT_PREPPED:
+ case DELETE:
Review Comment:
Enum the write operation is really hard to maintain, can we triggers the
table sercive whatever the operation is ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -760,15 +815,14 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(),
metadataPartition.getPartitionPath()))
- .withFileId(fileGroupFileId)
- .overBaseCommit(instantTime)
+ .withFileId(fileGroupFileId).overBaseCommit(instantTime)
.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
.withFileSize(0L)
- .withSizeThreshold(metadataWriteConfig.getLogFileMaxSize())
- .withFs(dataMetaClient.getFs())
- .withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
- .withLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+ .withSizeThreshold(metadataWriteConfig.getLogFileMaxSize())
+ .withFs(dataMetaClient.getFs())
+ .withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
Review Comment:
Fix the indentation.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -375,105 +357,91 @@ public List<MetadataPartitionType>
getEnabledPartitionTypes() {
return this.enabledPartitionTypes;
}
- /**
- * Initialize the metadata table if it does not exist.
- * <p>
- * If the metadata table does not exist, then file and partition listing is
used to initialize the table.
- *
- * @param engineContext
- * @param actionMetadata Action metadata types extending Avro
generated SpecificRecordBase
- * @param inflightInstantTimestamp Timestamp of an instant in progress on
the dataset. This instant is ignored
- * while deciding to initialize the metadata
table.
- */
- protected abstract <T extends SpecificRecordBase> void
initialize(HoodieEngineContext engineContext,
- Option<T>
actionMetadata,
-
Option<String> inflightInstantTimestamp);
-
- public void initTableMetadata() {
- try {
- if (this.metadata != null) {
- this.metadata.close();
- }
- this.metadata = new HoodieBackedTableMetadata(engineContext,
dataWriteConfig.getMetadataConfig(),
- dataWriteConfig.getBasePath(),
dataWriteConfig.getSpillableMapBasePath());
- this.metadataMetaClient = metadata.getMetadataMetaClient();
- } catch (Exception e) {
- throw new HoodieException("Error initializing metadata table for reads",
e);
- }
- }
-
/**
* Initialize the metadata table if needed.
*
* @param dataMetaClient - meta client for the data table
* @param actionMetadata - optional action metadata
* @param inflightInstantTimestamp - timestamp of an instant in progress on
the dataset
* @param <T> - action metadata types extending Avro
generated SpecificRecordBase
- * @throws IOException
+ * @throws IOException on errors
*/
- protected <T extends SpecificRecordBase> void
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
- Option<T>
actionMetadata,
-
Option<String> inflightInstantTimestamp) throws IOException {
+ protected <T extends SpecificRecordBase> boolean
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+
Option<T> actionMetadata,
+
Option<String> inflightInstantTimestamp) throws IOException {
HoodieTimer timer = HoodieTimer.start();
+ List<MetadataPartitionType> partitionsToInit = new
ArrayList<>(MetadataPartitionType.values().length);
- boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+ try {
+ boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+ if (!exists) {
+ // FILES partition is always required
+ partitionsToInit.add(MetadataPartitionType.FILES);
+ }
- if (!exists) {
- // Initialize for the first time by listing partitions and files
directly from the file system
- if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+ // check if any of the enabled partition types needs to be initialized
+ // NOTE: It needs to be guarded by async index config because if that is
enabled then initialization happens through the index scheduler.
+ if (!dataWriteConfig.isMetadataAsyncIndex()) {
+ Set<String> inflightAndCompletedPartitions =
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+ LOG.info("Async metadata indexing disabled and following partitions
already initialized: " + inflightAndCompletedPartitions);
+ this.enabledPartitionTypes.stream()
+ .filter(p ->
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) &&
!MetadataPartitionType.FILES.equals(p))
+ .forEach(partitionsToInit::add);
}
- return;
- }
- // if metadata table exists, then check if any of the enabled partition
types needs to be initialized
- // NOTE: It needs to be guarded by async index config because if that is
enabled then initialization happens through the index scheduler.
- if (!dataWriteConfig.isMetadataAsyncIndex()) {
- Set<String> inflightAndCompletedPartitions =
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
- LOG.info("Async metadata indexing enabled and following partitions
already initialized: " + inflightAndCompletedPartitions);
- List<MetadataPartitionType> partitionsToInit =
this.enabledPartitionTypes.stream()
- .filter(p ->
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) &&
!MetadataPartitionType.FILES.equals(p))
- .collect(Collectors.toList());
- // if there are no partitions to initialize or there is a pending
operation, then don't initialize in this round
- if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient,
inflightInstantTimestamp)) {
- return;
+ if (partitionsToInit.isEmpty()) {
+ // No partitions to initialize
+ return true;
+ }
+
+ // If there is no commit on the dataset yet, use the
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
+ // Otherwise, we use the timestamp of the latest completed action.
+ String initializationTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+
+ // Initialize partitions for the first time using data from the files on
the file system
+ if (!initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp)) {
+ LOG.error("Failed to initialize MDT from filesystem");
+ return false;
}
- String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
- initTableMetadata(); // re-init certain flags in BaseTableMetadata
- initializeEnabledFileGroups(dataMetaClient, createInstantTime,
partitionsToInit);
- initialCommit(createInstantTime, partitionsToInit);
- updateInitializedPartitionsInTableConfig(partitionsToInit);
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+ return true;
+ } catch (IOException e) {
+ LOG.error("Failed to initialize metadata table. Disabling the writer.",
e);
+ return false;
}
}
private <T extends SpecificRecordBase> boolean
metadataTableExists(HoodieTableMetaClient dataMetaClient,
Option<T>
actionMetadata) throws IOException {
- boolean exists = dataMetaClient.getFs().exists(new
Path(metadataWriteConfig.getBasePath(),
- HoodieTableMetaClient.METAFOLDER_NAME));
+ boolean exists = dataMetaClient.getTableConfig().isMetadataTableEnabled();
boolean reInitialize = false;
// If the un-synced instants have been archived, then
// the metadata table will need to be initialized again.
if (exists) {
- HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf.get())
- .setBasePath(metadataWriteConfig.getBasePath()).build();
+ try {
+ metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+ } catch (TableNotFoundException e) {
+ // Table not found, initialize the metadata table.
+ metadataMetaClient = initializeMetaClient();
+ }
if (DEFAULT_METADATA_POPULATE_META_FIELDS !=
metadataMetaClient.getTableConfig().populateMetaFields()) {
LOG.info("Re-initiating metadata table properties since populate meta
fields have changed");
- metadataMetaClient =
initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
+ metadataMetaClient = initializeMetaClient();
Review Comment:
If MDT does not exist and
`metadataMetaClient.getTableConfig().populateMetaFields()` is true, the
`initializeMetaClient()` could be invoked 2 times, which could incur exception.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1052,51 +1091,81 @@ protected HoodieData<HoodieRecord>
prepRecords(Map<MetadataPartitionType,
}
/**
- * Perform a compaction on the Metadata Table.
- *
- * Cases to be handled:
- * 1. We cannot perform compaction if there are previous inflight
operations on the dataset. This is because
- * a compacted metadata base file at time Tx should represent all the
actions on the dataset till time Tx.
- *
- * 2. In multi-writer scenario, a parallel operation with a greater
instantTime may have completed creating a
- * deltacommit.
+ * Optimize the metadata table by running compaction, clean and archive as
required.
+ * <p>
+ * Don't perform optimization if there are inflight operations on the
dataset. This is for two reasons:
+ * - The compaction will contain the correct data as all failed operations
have been rolled back.
+ * - Clean/compaction etc. will have the highest timestamp on the MDT and we
won't be adding new operations
+ * with smaller timestamps to metadata table (makes for easier debugging)
+ * <p>
+ * This adds the limitations that long-running async operations (clustering,
etc.) may cause delay in such MDT
+ * optimizations. We will relax this after MDT code has been hardened.
*/
- protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String
instantTime) {
- // finish off any pending compactions if any from previous attempt.
- writeClient.runAnyPendingCompactions();
-
- String latestDeltaCommitTimeInMetadataTable =
metadataMetaClient.reloadActiveTimeline()
- .getDeltaCommitTimeline()
- .filterCompletedInstants()
- .lastInstant().orElseThrow(() -> new HoodieMetadataException("No
completed deltacommit in metadata table"))
- .getTimestamp();
- // we need to find if there are any inflights in data table timeline
before or equal to the latest delta commit in metadata table.
- // Whenever you want to change this logic, please ensure all below
scenarios are considered.
- // a. There could be a chance that latest delta commit in MDT is committed
in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
- // b. There could be DT inflights after latest delta commit in MDT and we
are ok with it. bcoz, the contract is, latest compaction instant time in MDT
represents
- // any instants before that is already synced with metadata table.
- // c. Do consider out of order commits. For eg, c4 from DT could complete
before c3. and we can't trigger compaction in MDT with c4 as base instant time,
until every
- // instant before c4 is synced with metadata table.
- List<HoodieInstant> pendingInstants =
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
-
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
+ @Override
+ public void performTableServices(Option<String> inFlightInstantTimestamp) {
+ HoodieTimer metadataTableServicesTimer = HoodieTimer.start();
+ boolean allTableServicesExecutedSuccessfullyOrSkipped = true;
+ try {
+ BaseHoodieWriteClient writeClient = getWriteClient();
+ // Run any pending table services operations.
+ runPendingTableServicesOperations(writeClient);
+
+ // Check and run clean operations.
+ String latestDeltacommitTime =
metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline()
+ .filterCompletedInstants()
+ .lastInstant().get()
+ .getTimestamp();
+ LOG.info("Latest deltacommit time found is " + latestDeltacommitTime +
", running clean operations.");
+ cleanIfNecessary(writeClient, latestDeltacommitTime);
+
+ // Do timeline validation before scheduling compaction/logcompaction
operations.
+ if
(!validateTimelineBeforeSchedulingCompaction(inFlightInstantTimestamp,
latestDeltacommitTime)) {
+ return;
Review Comment:
We should not return directly because the archiving is also blocked, if no
compaction plan should be scheduled, the archiving should also be triggered.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -564,53 +532,147 @@ private <T extends SpecificRecordBase> boolean
isCommitRevertedByInFlightAction(
/**
* Initialize the Metadata Table by listing files and partitions from the
file system.
*
- * @param dataMetaClient - {@code HoodieTableMetaClient} for the
dataset.
+ * @param initializationTime - Timestamp to use for the commit
+ * @param partitionsToInit - List of MDT partitions to initialize
* @param inflightInstantTimestamp - Current action instant responsible for
this initialization
*/
- private boolean initializeFromFilesystem(HoodieTableMetaClient
dataMetaClient,
+ private boolean initializeFromFilesystem(String initializationTime,
List<MetadataPartitionType> partitionsToInit,
Option<String>
inflightInstantTimestamp) throws IOException {
if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
return false;
}
- String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
- initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
- initTableMetadata();
- // if async metadata indexing is enabled,
- // then only initialize files partition as other partitions will be built
using HoodieIndexer
- List<MetadataPartitionType> enabledPartitionTypes = new ArrayList<>();
- if (dataWriteConfig.isMetadataAsyncIndex()) {
- enabledPartitionTypes.add(MetadataPartitionType.FILES);
- } else {
- // all enabled ones should be initialized
- enabledPartitionTypes = this.enabledPartitionTypes;
+ // FILES partition is always initialized first
+
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+ || partitionsToInit.get(0).equals(MetadataPartitionType.FILES),
"FILES partition should be initialized first: " + partitionsToInit);
+
+ metadataMetaClient = initializeMetaClient();
+
+ // Get a complete list of files and partitions from the file system or
from already initialized FILES partition of MDT
+ boolean filesPartitionAvailable =
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+ List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ?
listAllPartitionsFromMDT(initializationTime) :
listAllPartitionsFromFilesystem(initializationTime);
+ Map<String, Map<String, Long>> partitionToFilesMap =
partitionInfoList.stream()
Review Comment:
Can you elaborate why we still load file info from MDT if it is enabled
before? Could this cause metadata in-consistency?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -489,7 +457,7 @@ private <T extends SpecificRecordBase> boolean
metadataTableExists(HoodieTableMe
* TODO: Revisit this logic and validate that filtering for all
* commits timeline is the right thing to do
*
- * @return True if the initialize is not needed, False otherwise
+ * @return True if the initialization is not needed, False otherwise
Review Comment:
Replace all the `latestMetadataInstant.get().getTimestamp()` with
`latestMetadataInstantTimestamp` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]