danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1194731645


##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -694,17 +695,75 @@ private Long getTableChecksum() {
     return getLong(TABLE_CHECKSUM);
   }
 
-  public List<String> getMetadataPartitionsInflight() {
-    return StringUtils.split(
-        getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, 
StringUtils.EMPTY_STRING),
-        CONFIG_VALUES_DELIMITER
-    );
+  public Set<String> getMetadataPartitionsInflight() {
+    return new HashSet<>(StringUtils.split(
+            getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, 
StringUtils.EMPTY_STRING),
+            CONFIG_VALUES_DELIMITER));
   }
 
   public Set<String> getMetadataPartitions() {
     return new HashSet<>(
-        StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, 
StringUtils.EMPTY_STRING),
-            CONFIG_VALUES_DELIMITER));
+            StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, 
StringUtils.EMPTY_STRING),
+                    CONFIG_VALUES_DELIMITER));
+  }
+
+  /**
+   * @returns true if metadata table has been created and is being used for 
this dataset, else returns false.
+   */
+  public boolean isMetadataTableEnabled() {
+    return isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+  }
+
+  /**
+   * Checks if metadata table is enabled and the specified partition has been 
initialized.
+   *
+   * @param partition The partition to check
+   * @returns true if the specific partition has been initialized, else 
returns false.
+   */
+  public boolean isMetadataPartitionEnabled(MetadataPartitionType partition) {
+    return getMetadataPartitions().contains(partition.getPartitionPath());
+  }
+
+  /**
+   * Enables or disables the specified metadata table partition.
+   *
+   * @param partition The partition
+   * @param enabled   If true, the partition is enabled, else disabled
+   */
+  public void setMetadataPartitionState(MetadataPartitionType partition, 
boolean enabled) {
+    
ValidationUtils.checkArgument(!partition.getPartitionPath().contains(CONFIG_VALUES_DELIMITER),
+            "Metadata Table partition path cannot contain a comma: " + 
partition.getPartitionPath());
+    Set<String> partitions = getMetadataPartitions();
+    Set<String> partitionsInflight = getMetadataPartitionsInflight();
+    if (enabled) {
+      partitions.add(partition.getPartitionPath());
+      partitionsInflight.remove(partition.getPartitionPath());
+    } else if (partition.equals(MetadataPartitionType.FILES)) {
+      // file listing partition is required for all other partitions to work
+      // Disabling file partition will also disable all partitions
+      partitions.clear();
+      partitionsInflight.clear();
+    } else {
+      partitions.remove(partition.getPartitionPath());
+      partitionsInflight.remove(partition.getPartitionPath());
+    }
+    setValue(TABLE_METADATA_PARTITIONS, 
partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
+    setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, 
partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));

Review Comment:
   Do we need to persist these options?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -337,15 +337,33 @@ protected void initMetadataTable(Option<String> 
instantTime) {
    *
    * @param inFlightInstantTimestamp - The in-flight action responsible for 
the metadata table initialization
    */
-  private void initializeMetadataTable(Option<String> 
inFlightInstantTimestamp) {
-    if (config.isMetadataTableEnabled()) {
-      HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
-          context, Option.empty(), inFlightInstantTimestamp);
-      try {
-        writer.close();
-      } catch (Exception e) {
-        throw new HoodieException("Failed to instantiate Metadata table ", e);
+  private void initializeMetadataTable(WriteOperationType operationType, 
Option<String> inFlightInstantTimestamp) {
+    if (!config.isMetadataTableEnabled()) {
+      return;
+    }
+
+    try (HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
+            context, Option.empty(), inFlightInstantTimestamp)) {
+      if (writer.isInitialized()) {
+        // Optimize the metadata table which involves compacton. cleaning, 
etc. This should only be called from writers.
+        switch (operationType) {
+          case INSERT:
+          case INSERT_PREPPED:
+          case UPSERT:
+          case UPSERT_PREPPED:
+          case BULK_INSERT:
+          case BULK_INSERT_PREPPED:
+          case DELETE:

Review Comment:
   Enum the write operation is really hard to maintain, can we triggers the 
table sercive whatever the operation is ?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -760,15 +815,14 @@ private void initializeFileGroups(HoodieTableMetaClient 
dataMetaClient, Metadata
 
         HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
             
.onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(), 
metadataPartition.getPartitionPath()))
-            .withFileId(fileGroupFileId)
-            .overBaseCommit(instantTime)
+            .withFileId(fileGroupFileId).overBaseCommit(instantTime)
             .withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
             .withFileSize(0L)
-            .withSizeThreshold(metadataWriteConfig.getLogFileMaxSize())
-            .withFs(dataMetaClient.getFs())
-            .withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
-            .withLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
-            .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+                .withSizeThreshold(metadataWriteConfig.getLogFileMaxSize())
+                .withFs(dataMetaClient.getFs())
+                .withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)

Review Comment:
   Fix the indentation.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -375,105 +357,91 @@ public List<MetadataPartitionType> 
getEnabledPartitionTypes() {
     return this.enabledPartitionTypes;
   }
 
-  /**
-   * Initialize the metadata table if it does not exist.
-   * <p>
-   * If the metadata table does not exist, then file and partition listing is 
used to initialize the table.
-   *
-   * @param engineContext
-   * @param actionMetadata           Action metadata types extending Avro 
generated SpecificRecordBase
-   * @param inflightInstantTimestamp Timestamp of an instant in progress on 
the dataset. This instant is ignored
-   *                                 while deciding to initialize the metadata 
table.
-   */
-  protected abstract <T extends SpecificRecordBase> void 
initialize(HoodieEngineContext engineContext,
-                                                                    Option<T> 
actionMetadata,
-                                                                    
Option<String> inflightInstantTimestamp);
-
-  public void initTableMetadata() {
-    try {
-      if (this.metadata != null) {
-        this.metadata.close();
-      }
-      this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(),
-          dataWriteConfig.getBasePath(), 
dataWriteConfig.getSpillableMapBasePath());
-      this.metadataMetaClient = metadata.getMetadataMetaClient();
-    } catch (Exception e) {
-      throw new HoodieException("Error initializing metadata table for reads", 
e);
-    }
-  }
-
   /**
    * Initialize the metadata table if needed.
    *
    * @param dataMetaClient           - meta client for the data table
    * @param actionMetadata           - optional action metadata
    * @param inflightInstantTimestamp - timestamp of an instant in progress on 
the dataset
    * @param <T>                      - action metadata types extending Avro 
generated SpecificRecordBase
-   * @throws IOException
+   * @throws IOException on errors
    */
-  protected <T extends SpecificRecordBase> void 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
-                                                                   Option<T> 
actionMetadata,
-                                                                   
Option<String> inflightInstantTimestamp) throws IOException {
+  protected <T extends SpecificRecordBase> boolean 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                                                      
Option<T> actionMetadata,
+                                                                      
Option<String> inflightInstantTimestamp) throws IOException {
     HoodieTimer timer = HoodieTimer.start();
+    List<MetadataPartitionType> partitionsToInit = new 
ArrayList<>(MetadataPartitionType.values().length);
 
-    boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+    try {
+      boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+      if (!exists) {
+        // FILES partition is always required
+        partitionsToInit.add(MetadataPartitionType.FILES);
+      }
 
-    if (!exists) {
-      // Initialize for the first time by listing partitions and files 
directly from the file system
-      if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
-        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+      // check if any of the enabled partition types needs to be initialized
+      // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
+      if (!dataWriteConfig.isMetadataAsyncIndex()) {
+        Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+        LOG.info("Async metadata indexing disabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
+        this.enabledPartitionTypes.stream()
+                .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
+                .forEach(partitionsToInit::add);
       }
-      return;
-    }
 
-    // if metadata table exists, then check if any of the enabled partition 
types needs to be initialized
-    // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
-    if (!dataWriteConfig.isMetadataAsyncIndex()) {
-      Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
-      LOG.info("Async metadata indexing enabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
-      List<MetadataPartitionType> partitionsToInit = 
this.enabledPartitionTypes.stream()
-          .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
-          .collect(Collectors.toList());
-      // if there are no partitions to initialize or there is a pending 
operation, then don't initialize in this round
-      if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, 
inflightInstantTimestamp)) {
-        return;
+      if (partitionsToInit.isEmpty()) {
+        // No partitions to initialize
+        return true;
+      }
+
+      // If there is no commit on the dataset yet, use the 
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
+      // Otherwise, we use the timestamp of the latest completed action.
+      String initializationTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+
+      // Initialize partitions for the first time using data from the files on 
the file system
+      if (!initializeFromFilesystem(initializationTime, partitionsToInit, 
inflightInstantTimestamp)) {
+        LOG.error("Failed to initialize MDT from filesystem");
+        return false;
       }
 
-      String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-      initTableMetadata(); // re-init certain flags in BaseTableMetadata
-      initializeEnabledFileGroups(dataMetaClient, createInstantTime, 
partitionsToInit);
-      initialCommit(createInstantTime, partitionsToInit);
-      updateInitializedPartitionsInTableConfig(partitionsToInit);
+      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+      return true;
+    } catch (IOException e) {
+      LOG.error("Failed to initialize metadata table. Disabling the writer.", 
e);
+      return false;
     }
   }
 
   private <T extends SpecificRecordBase> boolean 
metadataTableExists(HoodieTableMetaClient dataMetaClient,
                                                                      Option<T> 
actionMetadata) throws IOException {
-    boolean exists = dataMetaClient.getFs().exists(new 
Path(metadataWriteConfig.getBasePath(),
-        HoodieTableMetaClient.METAFOLDER_NAME));
+    boolean exists = dataMetaClient.getTableConfig().isMetadataTableEnabled();
     boolean reInitialize = false;
 
     // If the un-synced instants have been archived, then
     // the metadata table will need to be initialized again.
     if (exists) {
-      HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf.get())
-          .setBasePath(metadataWriteConfig.getBasePath()).build();
+      try {
+        metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+      } catch (TableNotFoundException e) {
+        // Table not found, initialize the metadata table.
+        metadataMetaClient = initializeMetaClient();
+      }
 
       if (DEFAULT_METADATA_POPULATE_META_FIELDS != 
metadataMetaClient.getTableConfig().populateMetaFields()) {
         LOG.info("Re-initiating metadata table properties since populate meta 
fields have changed");
-        metadataMetaClient = 
initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
+        metadataMetaClient = initializeMetaClient();

Review Comment:
   If MDT does not exist and 
`metadataMetaClient.getTableConfig().populateMetaFields()` is true, the 
`initializeMetaClient()` could be invoked 2 times, which could incur exception.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1052,51 +1091,81 @@ protected HoodieData<HoodieRecord> 
prepRecords(Map<MetadataPartitionType,
   }
 
   /**
-   *  Perform a compaction on the Metadata Table.
-   *
-   * Cases to be handled:
-   *   1. We cannot perform compaction if there are previous inflight 
operations on the dataset. This is because
-   *      a compacted metadata base file at time Tx should represent all the 
actions on the dataset till time Tx.
-   *
-   *   2. In multi-writer scenario, a parallel operation with a greater 
instantTime may have completed creating a
-   *      deltacommit.
+   * Optimize the metadata table by running compaction, clean and archive as 
required.
+   * <p>
+   * Don't perform optimization if there are inflight operations on the 
dataset. This is for two reasons:
+   * - The compaction will contain the correct data as all failed operations 
have been rolled back.
+   * - Clean/compaction etc. will have the highest timestamp on the MDT and we 
won't be adding new operations
+   * with smaller timestamps to metadata table (makes for easier debugging)
+   * <p>
+   * This adds the limitations that long-running async operations (clustering, 
etc.) may cause delay in such MDT
+   * optimizations. We will relax this after MDT code has been hardened.
    */
-  protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String 
instantTime) {
-    // finish off any pending compactions if any from previous attempt.
-    writeClient.runAnyPendingCompactions();
-
-    String latestDeltaCommitTimeInMetadataTable = 
metadataMetaClient.reloadActiveTimeline()
-        .getDeltaCommitTimeline()
-        .filterCompletedInstants()
-        .lastInstant().orElseThrow(() -> new HoodieMetadataException("No 
completed deltacommit in metadata table"))
-        .getTimestamp();
-    // we need to find if there are any inflights in data table timeline 
before or equal to the latest delta commit in metadata table.
-    // Whenever you want to change this logic, please ensure all below 
scenarios are considered.
-    // a. There could be a chance that latest delta commit in MDT is committed 
in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
-    // b. There could be DT inflights after latest delta commit in MDT and we 
are ok with it. bcoz, the contract is, latest compaction instant time in MDT 
represents
-    // any instants before that is already synced with metadata table.
-    // c. Do consider out of order commits. For eg, c4 from DT could complete 
before c3. and we can't trigger compaction in MDT with c4 as base instant time, 
until every
-    // instant before c4 is synced with metadata table.
-    List<HoodieInstant> pendingInstants = 
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
-        
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
+  @Override
+  public void performTableServices(Option<String> inFlightInstantTimestamp) {
+    HoodieTimer metadataTableServicesTimer = HoodieTimer.start();
+    boolean allTableServicesExecutedSuccessfullyOrSkipped = true;
+    try {
+      BaseHoodieWriteClient writeClient = getWriteClient();
+      // Run any pending table services operations.
+      runPendingTableServicesOperations(writeClient);
+
+      // Check and run clean operations.
+      String latestDeltacommitTime = 
metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline()
+              .filterCompletedInstants()
+              .lastInstant().get()
+              .getTimestamp();
+      LOG.info("Latest deltacommit time found is " + latestDeltacommitTime + 
", running clean operations.");
+      cleanIfNecessary(writeClient, latestDeltacommitTime);
+
+      // Do timeline validation before scheduling compaction/logcompaction 
operations.
+      if 
(!validateTimelineBeforeSchedulingCompaction(inFlightInstantTimestamp, 
latestDeltacommitTime)) {
+        return;

Review Comment:
   We should not return directly because the archiving is also blocked, if no 
compaction plan should be scheduled, the archiving should also be triggered.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -564,53 +532,147 @@ private <T extends SpecificRecordBase> boolean 
isCommitRevertedByInFlightAction(
   /**
    * Initialize the Metadata Table by listing files and partitions from the 
file system.
    *
-   * @param dataMetaClient           - {@code HoodieTableMetaClient} for the 
dataset.
+   * @param initializationTime       - Timestamp to use for the commit
+   * @param partitionsToInit         - List of MDT partitions to initialize
    * @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
    */
-  private boolean initializeFromFilesystem(HoodieTableMetaClient 
dataMetaClient,
+  private boolean initializeFromFilesystem(String initializationTime, 
List<MetadataPartitionType> partitionsToInit,
                                            Option<String> 
inflightInstantTimestamp) throws IOException {
     if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
       return false;
     }
 
-    String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
-    initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
-    initTableMetadata();
-    // if async metadata indexing is enabled,
-    // then only initialize files partition as other partitions will be built 
using HoodieIndexer
-    List<MetadataPartitionType> enabledPartitionTypes =  new ArrayList<>();
-    if (dataWriteConfig.isMetadataAsyncIndex()) {
-      enabledPartitionTypes.add(MetadataPartitionType.FILES);
-    } else {
-      // all enabled ones should be initialized
-      enabledPartitionTypes = this.enabledPartitionTypes;
+    // FILES partition is always initialized first
+    
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+            || partitionsToInit.get(0).equals(MetadataPartitionType.FILES), 
"FILES partition should be initialized first: " + partitionsToInit);
+
+    metadataMetaClient = initializeMetaClient();
+
+    // Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+    List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
+    Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()

Review Comment:
   Can you elaborate why we still load file info from MDT if it is enabled 
before? Could this cause metadata in-consistency?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -489,7 +457,7 @@ private <T extends SpecificRecordBase> boolean 
metadataTableExists(HoodieTableMe
    * TODO: Revisit this logic and validate that filtering for all
    *       commits timeline is the right thing to do
    *
-   * @return True if the initialize is not needed, False otherwise
+   * @return True if the initialization is not needed, False otherwise

Review Comment:
   Replace all the `latestMetadataInstant.get().getTimestamp()` with 
`latestMetadataInstantTimestamp` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to