nsivabalan commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1191243539


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -873,17 +908,7 @@ public void buildMetadataPartitions(HoodieEngineContext 
engineContext, List<Hood
     indexPartitionInfos.forEach(indexPartitionInfo -> {
       String relativePartitionPath = 
indexPartitionInfo.getMetadataPartitionPath();
       LOG.info(String.format("Creating a new metadata index for partition '%s' 
under path %s upto instant %s",
-          relativePartitionPath, metadataWriteConfig.getBasePath(), 
indexUptoInstantTime));
-      try {
-        // file group should have already been initialized while scheduling 
index for this partition
-        if (!dataMetaClient.getFs().exists(new 
Path(metadataWriteConfig.getBasePath(), relativePartitionPath))) {

Review Comment:
   can you point me to the code where we handle partial initialization failure. 
Guess this code is handling that. I assume we handle it elsewhere in this patch 
and hence have removed this .



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1097,87 +1165,76 @@ protected void cleanIfNecessary(BaseHoodieWriteClient 
writeClient, String instan
     // Trigger cleaning with suffixes based on the same instant time. This 
ensures that any future
     // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
     // metadata table.
-    writeClient.clean(instantTime + "002");
+    
writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime));
     writeClient.lazyRollbackFailedIndexing();
   }
 
   /**
-   * This is invoked to initialize metadata table for a dataset.
-   * Initial commit has special handling mechanism due to its scale compared 
to other regular commits.
-   * During cold startup, the list of files to be committed can be huge.
-   * So creating a HoodieCommitMetadata out of these large number of files,
-   * and calling the existing update(HoodieCommitMetadata) function does not 
scale well.
-   * Hence, we have a special commit just for the initialization scenario.
+   * Validates the timeline for both main and metadata tables.
    */
-  private void initialCommit(String createInstantTime, 
List<MetadataPartitionType> partitionTypes) {
-    // List all partitions in the basePath of the containing dataset
-    LOG.info("Initializing metadata table by using file listings in " + 
dataWriteConfig.getBasePath());
-    engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing 
metadata table by listing files and partitions: " + 
dataWriteConfig.getTableName());
-
-    Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap 
= new HashMap<>();
-
-    // skip file system listing to populate metadata records if it's a fresh 
table.
-    // this is applicable only if the table already has N commits and metadata 
is enabled at a later point in time.
-    if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // 
SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
-      // If not, last completed commit in data table will be chosen as the 
initial commit time.
-      LOG.info("Triggering empty Commit to metadata to initialize");
-    } else {
-      List<DirectoryInfo> partitionInfoList = 
listAllPartitions(dataMetaClient);
-      Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
-          .map(p -> {
-            String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
-            return Pair.of(partitionName, p.getFileNameToSizeMap());
-          })
-          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-      int totalDataFilesCount = 
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
-      List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
-      if (partitionTypes.contains(MetadataPartitionType.FILES)) {
-        // Record which saves the list of all partitions
-        HoodieRecord allPartitionRecord = 
HoodieMetadataPayload.createPartitionListRecord(partitions);
-        HoodieData<HoodieRecord> filesPartitionRecords = 
getFilesPartitionRecords(createInstantTime, partitionInfoList, 
allPartitionRecord);
-        ValidationUtils.checkState(filesPartitionRecords.count() == 
(partitions.size() + 1));
-        partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecords);
-      }
-
-      if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && 
totalDataFilesCount > 0) {
-        final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
-            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
-        partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
recordsRDD);
-      }
-
-      if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && 
totalDataFilesCount > 0) {
-        final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
-            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
-        partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, 
recordsRDD);
-      }
-      LOG.info("Committing " + partitions.size() + " partitions and " + 
totalDataFilesCount + " files to metadata");
+  private boolean validateTimelineBeforeSchedulingCompaction(Option<String> 
inFlightInstantTimestamp, String latestDeltacommitTime) {
+    // There should not be any incomplete instants on MDT
+    HoodieActiveTimeline metadataTimeline = 
metadataMetaClient.reloadActiveTimeline();
+    List<HoodieInstant> pendingInstantsOnMetadataTable = 
metadataTimeline.filterInflightsAndRequested().getInstants();
+    if (!pendingInstantsOnMetadataTable.isEmpty()) {
+      LOG.info(String.format(
+              "Cannot compact MDT as there are %d inflight instants: %s",
+              pendingInstantsOnMetadataTable.size(), 
Arrays.toString(pendingInstantsOnMetadataTable.toArray())));
+      metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.SKIP_TABLE_SERVICES, 1));
+      return false;
     }
 
-    commit(createInstantTime, partitionToRecordsMap, false);
-  }
+    // There should not be any incomplete instants on dataset
+    HoodieActiveTimeline datasetTimeline = 
dataMetaClient.reloadActiveTimeline();
+    List<HoodieInstant> pendingInstantsOnDataset = 
datasetTimeline.filterInflightsAndRequested().getInstantsAsStream()
+            .filter(i -> !inFlightInstantTimestamp.isPresent() || 
!i.getTimestamp().equals(inFlightInstantTimestamp.get()))
+            .collect(Collectors.toList());
+    if (!pendingInstantsOnDataset.isEmpty()) {
+      LOG.info(String.format(
+              "Cannot compact MDT as there are %d inflight instants on dataset 
before latest deltacommit %s: %s",
+              pendingInstantsOnDataset.size(), latestDeltacommitTime, 
Arrays.toString(pendingInstantsOnDataset.toArray())));
+      metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.SKIP_TABLE_SERVICES, 1));
+      return false;
+    }
 
-  private HoodieData<HoodieRecord> getFilesPartitionRecords(String 
createInstantTime, List<DirectoryInfo> partitionInfoList, HoodieRecord 
allPartitionRecord) {
-    HoodieData<HoodieRecord> filesPartitionRecords = 
engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
-    if (partitionInfoList.isEmpty()) {
-      return filesPartitionRecords;
+    // Check if the inflight commit is greater than all the completed commits.

Review Comment:
   do we really need to add this constraint? again, can you check if L1215 - 
1234 would suffice. I am inclining to favor the liveness of MDT compaction. if 
these are added just to be very cautious and we don't really need it, we should 
remove it. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1097,87 +1165,76 @@ protected void cleanIfNecessary(BaseHoodieWriteClient 
writeClient, String instan
     // Trigger cleaning with suffixes based on the same instant time. This 
ensures that any future
     // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
     // metadata table.
-    writeClient.clean(instantTime + "002");
+    
writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime));
     writeClient.lazyRollbackFailedIndexing();
   }
 
   /**
-   * This is invoked to initialize metadata table for a dataset.
-   * Initial commit has special handling mechanism due to its scale compared 
to other regular commits.
-   * During cold startup, the list of files to be committed can be huge.
-   * So creating a HoodieCommitMetadata out of these large number of files,
-   * and calling the existing update(HoodieCommitMetadata) function does not 
scale well.
-   * Hence, we have a special commit just for the initialization scenario.
+   * Validates the timeline for both main and metadata tables.
    */
-  private void initialCommit(String createInstantTime, 
List<MetadataPartitionType> partitionTypes) {
-    // List all partitions in the basePath of the containing dataset
-    LOG.info("Initializing metadata table by using file listings in " + 
dataWriteConfig.getBasePath());
-    engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing 
metadata table by listing files and partitions: " + 
dataWriteConfig.getTableName());
-
-    Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap 
= new HashMap<>();
-
-    // skip file system listing to populate metadata records if it's a fresh 
table.
-    // this is applicable only if the table already has N commits and metadata 
is enabled at a later point in time.
-    if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // 
SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
-      // If not, last completed commit in data table will be chosen as the 
initial commit time.
-      LOG.info("Triggering empty Commit to metadata to initialize");
-    } else {
-      List<DirectoryInfo> partitionInfoList = 
listAllPartitions(dataMetaClient);
-      Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
-          .map(p -> {
-            String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
-            return Pair.of(partitionName, p.getFileNameToSizeMap());
-          })
-          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-      int totalDataFilesCount = 
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
-      List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
-      if (partitionTypes.contains(MetadataPartitionType.FILES)) {
-        // Record which saves the list of all partitions
-        HoodieRecord allPartitionRecord = 
HoodieMetadataPayload.createPartitionListRecord(partitions);
-        HoodieData<HoodieRecord> filesPartitionRecords = 
getFilesPartitionRecords(createInstantTime, partitionInfoList, 
allPartitionRecord);
-        ValidationUtils.checkState(filesPartitionRecords.count() == 
(partitions.size() + 1));
-        partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecords);
-      }
-
-      if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && 
totalDataFilesCount > 0) {
-        final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
-            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
-        partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
recordsRDD);
-      }
-
-      if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && 
totalDataFilesCount > 0) {
-        final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
-            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
-        partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, 
recordsRDD);
-      }
-      LOG.info("Committing " + partitions.size() + " partitions and " + 
totalDataFilesCount + " files to metadata");
+  private boolean validateTimelineBeforeSchedulingCompaction(Option<String> 
inFlightInstantTimestamp, String latestDeltacommitTime) {
+    // There should not be any incomplete instants on MDT
+    HoodieActiveTimeline metadataTimeline = 
metadataMetaClient.reloadActiveTimeline();
+    List<HoodieInstant> pendingInstantsOnMetadataTable = 
metadataTimeline.filterInflightsAndRequested().getInstants();
+    if (!pendingInstantsOnMetadataTable.isEmpty()) {
+      LOG.info(String.format(
+              "Cannot compact MDT as there are %d inflight instants: %s",
+              pendingInstantsOnMetadataTable.size(), 
Arrays.toString(pendingInstantsOnMetadataTable.toArray())));
+      metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.SKIP_TABLE_SERVICES, 1));
+      return false;
     }
 
-    commit(createInstantTime, partitionToRecordsMap, false);
-  }
+    // There should not be any incomplete instants on dataset
+    HoodieActiveTimeline datasetTimeline = 
dataMetaClient.reloadActiveTimeline();
+    List<HoodieInstant> pendingInstantsOnDataset = 
datasetTimeline.filterInflightsAndRequested().getInstantsAsStream()
+            .filter(i -> !inFlightInstantTimestamp.isPresent() || 
!i.getTimestamp().equals(inFlightInstantTimestamp.get()))
+            .collect(Collectors.toList());
+    if (!pendingInstantsOnDataset.isEmpty()) {
+      LOG.info(String.format(
+              "Cannot compact MDT as there are %d inflight instants on dataset 
before latest deltacommit %s: %s",
+              pendingInstantsOnDataset.size(), latestDeltacommitTime, 
Arrays.toString(pendingInstantsOnDataset.toArray())));
+      metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.SKIP_TABLE_SERVICES, 1));
+      return false;
+    }
 
-  private HoodieData<HoodieRecord> getFilesPartitionRecords(String 
createInstantTime, List<DirectoryInfo> partitionInfoList, HoodieRecord 
allPartitionRecord) {
-    HoodieData<HoodieRecord> filesPartitionRecords = 
engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
-    if (partitionInfoList.isEmpty()) {
-      return filesPartitionRecords;
+    // Check if the inflight commit is greater than all the completed commits.
+    Option<HoodieInstant> lastCompletedInstant = 
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+    if (!lastCompletedInstant.isPresent()) {
+      LOG.info("Last completed commit is not present.");
+      return false;
+    }
+    if 
(HoodieTimeline.compareTimestamps(lastCompletedInstant.get().getTimestamp(),
+            HoodieTimeline.GREATER_THAN, inFlightInstantTimestamp.get())) {
+      // Completed commits validation failed.
+      LOG.info(String.format(
+              "Cannot compact MDT as there is %s that is greater than inflight 
instant: %s",
+              lastCompletedInstant.get(), inFlightInstantTimestamp.get()));
+      return false;
     }
 
-    HoodieData<HoodieRecord> fileListRecords = 
engineContext.parallelize(partitionInfoList, 
partitionInfoList.size()).map(partitionInfo -> {
-      Map<String, Long> fileNameToSizeMap = 
partitionInfo.getFileNameToSizeMap();
-      // filter for files that are part of the completed commits
-      Map<String, Long> validFileNameToSizeMap = 
fileNameToSizeMap.entrySet().stream().filter(fileSizePair -> {
-        String commitTime = FSUtils.getCommitTime(fileSizePair.getKey());
-        return HoodieTimeline.compareTimestamps(commitTime, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, createInstantTime);
-      }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    String latestDeltaCommitTimeInMetadataTable = 
metadataMetaClient.reloadActiveTimeline()

Review Comment:
   I see that we are reloading more than once for datatable and could possibly 
w/ metadata table as well even within 
validateTimelineBeforeSchedulingCompaction. Can you revisit all such reloads 
and ensure we do it just once. these might turn out to be costly. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1097,87 +1165,76 @@ protected void cleanIfNecessary(BaseHoodieWriteClient 
writeClient, String instan
     // Trigger cleaning with suffixes based on the same instant time. This 
ensures that any future
     // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
     // metadata table.
-    writeClient.clean(instantTime + "002");
+    
writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime));
     writeClient.lazyRollbackFailedIndexing();
   }
 
   /**
-   * This is invoked to initialize metadata table for a dataset.
-   * Initial commit has special handling mechanism due to its scale compared 
to other regular commits.
-   * During cold startup, the list of files to be committed can be huge.
-   * So creating a HoodieCommitMetadata out of these large number of files,
-   * and calling the existing update(HoodieCommitMetadata) function does not 
scale well.
-   * Hence, we have a special commit just for the initialization scenario.
+   * Validates the timeline for both main and metadata tables.
    */
-  private void initialCommit(String createInstantTime, 
List<MetadataPartitionType> partitionTypes) {
-    // List all partitions in the basePath of the containing dataset
-    LOG.info("Initializing metadata table by using file listings in " + 
dataWriteConfig.getBasePath());
-    engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing 
metadata table by listing files and partitions: " + 
dataWriteConfig.getTableName());
-
-    Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap 
= new HashMap<>();
-
-    // skip file system listing to populate metadata records if it's a fresh 
table.
-    // this is applicable only if the table already has N commits and metadata 
is enabled at a later point in time.
-    if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // 
SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
-      // If not, last completed commit in data table will be chosen as the 
initial commit time.
-      LOG.info("Triggering empty Commit to metadata to initialize");
-    } else {
-      List<DirectoryInfo> partitionInfoList = 
listAllPartitions(dataMetaClient);
-      Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
-          .map(p -> {
-            String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
-            return Pair.of(partitionName, p.getFileNameToSizeMap());
-          })
-          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-      int totalDataFilesCount = 
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
-      List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
-      if (partitionTypes.contains(MetadataPartitionType.FILES)) {
-        // Record which saves the list of all partitions
-        HoodieRecord allPartitionRecord = 
HoodieMetadataPayload.createPartitionListRecord(partitions);
-        HoodieData<HoodieRecord> filesPartitionRecords = 
getFilesPartitionRecords(createInstantTime, partitionInfoList, 
allPartitionRecord);
-        ValidationUtils.checkState(filesPartitionRecords.count() == 
(partitions.size() + 1));
-        partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecords);
-      }
-
-      if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && 
totalDataFilesCount > 0) {
-        final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
-            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
-        partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
recordsRDD);
-      }
-
-      if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && 
totalDataFilesCount > 0) {
-        final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
-            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
-        partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, 
recordsRDD);
-      }
-      LOG.info("Committing " + partitions.size() + " partitions and " + 
totalDataFilesCount + " files to metadata");
+  private boolean validateTimelineBeforeSchedulingCompaction(Option<String> 
inFlightInstantTimestamp, String latestDeltacommitTime) {
+    // There should not be any incomplete instants on MDT
+    HoodieActiveTimeline metadataTimeline = 
metadataMetaClient.reloadActiveTimeline();
+    List<HoodieInstant> pendingInstantsOnMetadataTable = 
metadataTimeline.filterInflightsAndRequested().getInstants();
+    if (!pendingInstantsOnMetadataTable.isEmpty()) {
+      LOG.info(String.format(
+              "Cannot compact MDT as there are %d inflight instants: %s",
+              pendingInstantsOnMetadataTable.size(), 
Arrays.toString(pendingInstantsOnMetadataTable.toArray())));
+      metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.SKIP_TABLE_SERVICES, 1));
+      return false;
     }
 
-    commit(createInstantTime, partitionToRecordsMap, false);
-  }
+    // There should not be any incomplete instants on dataset

Review Comment:
   I guess the conditions in L1215 - 1234 should subsume this condition (L 1187 
to 1197). infact we had it this way probably 2 releases back and evolved it. 
   we can probably remove L1215 to 1234. 
   



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1378,6 +1339,206 @@ public static Set<String> 
getInflightAndCompletedMetadataPartitions(HoodieTableC
    */
   public static boolean isIndexingCommit(String instantTime) {
     return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + 
METADATA_INDEXER_TIME_SUFFIX.length()
-        && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+            && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+  }
+
+  /**
+   * Delete the metadata table for the dataset and backup if required.
+   *
+   * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for 
which metadata table is to be deleted
+   * @param context        instance of {@link HoodieEngineContext}.
+   * @param backup         Whether metadata table should be backed up before 
deletion. If true, the table is backed up to the
+   *                       directory with name metadata_<current_timestamp>.
+   * @return The backup directory if backup was requested
+   */
+  public static String deleteMetadataTable(HoodieTableMetaClient 
dataMetaClient, HoodieEngineContext context, boolean backup) {
+    final Path metadataTablePath = 
HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePathV2());
+    FileSystem fs = FSUtils.getFs(metadataTablePath.toString(), 
context.getHadoopConf().get());
+    setMetadataPartitionState(dataMetaClient, MetadataPartitionType.FILES, 
false);

Review Comment:
   shouldn't we clear all partitions ? 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1378,6 +1339,206 @@ public static Set<String> 
getInflightAndCompletedMetadataPartitions(HoodieTableC
    */
   public static boolean isIndexingCommit(String instantTime) {
     return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + 
METADATA_INDEXER_TIME_SUFFIX.length()
-        && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+            && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+  }
+
+  /**
+   * Delete the metadata table for the dataset and backup if required.
+   *
+   * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for 
which metadata table is to be deleted
+   * @param context        instance of {@link HoodieEngineContext}.
+   * @param backup         Whether metadata table should be backed up before 
deletion. If true, the table is backed up to the
+   *                       directory with name metadata_<current_timestamp>.
+   * @return The backup directory if backup was requested
+   */
+  public static String deleteMetadataTable(HoodieTableMetaClient 
dataMetaClient, HoodieEngineContext context, boolean backup) {
+    final Path metadataTablePath = 
HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePathV2());
+    FileSystem fs = FSUtils.getFs(metadataTablePath.toString(), 
context.getHadoopConf().get());
+    setMetadataPartitionState(dataMetaClient, MetadataPartitionType.FILES, 
false);
+    try {
+      if (!fs.exists(metadataTablePath)) {
+        return null;
+      }
+    } catch (FileNotFoundException e) {
+      // Ignoring exception as metadata table already does not exist
+      return null;
+    } catch (IOException e) {
+      throw new HoodieMetadataException("Failed to check metadata table 
existence", e);
+    }
+
+    if (backup) {
+      final Path metadataBackupPath = new Path(metadataTablePath.getParent(), 
".metadata_" + HoodieActiveTimeline.createNewInstantTime());
+      LOG.info("Backing up metadata directory to " + metadataBackupPath + " 
before deletion");
+      try {
+        if (fs.rename(metadataTablePath, metadataBackupPath)) {

Review Comment:
   also, who is responsible to clean up the backup metadata table? I assume its 
manual for now. and if no one intervenes, it might be left as is. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1378,6 +1339,206 @@ public static Set<String> 
getInflightAndCompletedMetadataPartitions(HoodieTableC
    */
   public static boolean isIndexingCommit(String instantTime) {
     return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + 
METADATA_INDEXER_TIME_SUFFIX.length()
-        && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+            && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+  }
+
+  /**
+   * Delete the metadata table for the dataset and backup if required.
+   *
+   * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for 
which metadata table is to be deleted
+   * @param context        instance of {@link HoodieEngineContext}.
+   * @param backup         Whether metadata table should be backed up before 
deletion. If true, the table is backed up to the
+   *                       directory with name metadata_<current_timestamp>.
+   * @return The backup directory if backup was requested
+   */
+  public static String deleteMetadataTable(HoodieTableMetaClient 
dataMetaClient, HoodieEngineContext context, boolean backup) {
+    final Path metadataTablePath = 
HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePathV2());
+    FileSystem fs = FSUtils.getFs(metadataTablePath.toString(), 
context.getHadoopConf().get());
+    setMetadataPartitionState(dataMetaClient, MetadataPartitionType.FILES, 
false);
+    try {
+      if (!fs.exists(metadataTablePath)) {
+        return null;
+      }
+    } catch (FileNotFoundException e) {
+      // Ignoring exception as metadata table already does not exist
+      return null;
+    } catch (IOException e) {
+      throw new HoodieMetadataException("Failed to check metadata table 
existence", e);
+    }
+
+    if (backup) {
+      final Path metadataBackupPath = new Path(metadataTablePath.getParent(), 
".metadata_" + HoodieActiveTimeline.createNewInstantTime());
+      LOG.info("Backing up metadata directory to " + metadataBackupPath + " 
before deletion");
+      try {
+        if (fs.rename(metadataTablePath, metadataBackupPath)) {
+          return metadataBackupPath.toString();
+        }
+      } catch (Exception e) {
+        // If rename fails, we will ignore the backup and still delete the MDT
+        LOG.error("Failed to backup metadata table using rename", e);
+      }
+    }
+
+    LOG.info("Deleting metadata table from " + metadataTablePath);
+    try {
+      fs.delete(metadataTablePath, true);
+    } catch (Exception e) {
+      throw new HoodieMetadataException("Failed to delete metadata table from 
path " + metadataTablePath, e);
+    }
+
+    return null;
+  }
+
+  public static HoodieTableMetaClient 
setMetadataPartitionState(HoodieTableMetaClient dataMetaClient, 
MetadataPartitionType partitionType, boolean enabled) {
+    dataMetaClient.getTableConfig().setMetadataPartitionState(partitionType, 
enabled);
+    HoodieTableConfig.update(dataMetaClient.getFs(), new 
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
+    dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+    
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionEnabled(partitionType)
 == enabled,
+            "Metadata table state change should be persisted");
+
+    LOG.info(String.format("Metadata table %s partition %s has been %s", 
dataMetaClient.getBasePathV2(), partitionType,
+            enabled ? "enabled" : "disabled"));
+    return dataMetaClient;
+  }
+
+  /**
+   * Delete a partition within the metadata table.
+   * <p>
+   * This can be used to delete a partition so that it can be re-bootstrapped.
+   *
+   * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for 
which metadata table is to be deleted
+   * @param context        instance of {@code HoodieEngineContext}.
+   * @param backup         Whether metadata table should be backed up before 
deletion. If true, the table is backed up to the
+   *                       directory with name metadata_<current_timestamp>.
+   * @param partitionType  The partition to delete
+   * @return The backup directory if backup was requested, null otherwise
+   */
+  public static String deleteMetadataTablePartition(HoodieTableMetaClient 
dataMetaClient, HoodieEngineContext context,
+                                                    MetadataPartitionType 
partitionType, boolean backup) {
+    if (partitionType.equals(MetadataPartitionType.FILES)) {
+      return deleteMetadataTable(dataMetaClient, context, backup);
+    }
+
+    final Path metadataTablePartitionPath = new 
Path(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath()),
 partitionType.getPartitionPath());
+    FileSystem fs = FSUtils.getFs(metadataTablePartitionPath.toString(), 
context.getHadoopConf().get());
+    setMetadataPartitionState(dataMetaClient, partitionType, false);
+    try {
+      if (!fs.exists(metadataTablePartitionPath)) {
+        return null;
+      }
+    } catch (FileNotFoundException e) {
+      // Ignoring exception as metadata table already does not exist
+      LOG.debug("Metadata table partition " + partitionType + " not found at 
path " + metadataTablePartitionPath);
+      return null;
+    } catch (Exception e) {
+      throw new HoodieMetadataException("Failed to check metadata table 
partition existence", e);

Review Comment:
   can we print the partition of interest along w/ full path. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1378,6 +1339,206 @@ public static Set<String> 
getInflightAndCompletedMetadataPartitions(HoodieTableC
    */
   public static boolean isIndexingCommit(String instantTime) {
     return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + 
METADATA_INDEXER_TIME_SUFFIX.length()
-        && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+            && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+  }
+
+  /**
+   * Delete the metadata table for the dataset and backup if required.
+   *
+   * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for 
which metadata table is to be deleted
+   * @param context        instance of {@link HoodieEngineContext}.
+   * @param backup         Whether metadata table should be backed up before 
deletion. If true, the table is backed up to the
+   *                       directory with name metadata_<current_timestamp>.
+   * @return The backup directory if backup was requested
+   */
+  public static String deleteMetadataTable(HoodieTableMetaClient 
dataMetaClient, HoodieEngineContext context, boolean backup) {
+    final Path metadataTablePath = 
HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePathV2());
+    FileSystem fs = FSUtils.getFs(metadataTablePath.toString(), 
context.getHadoopConf().get());
+    setMetadataPartitionState(dataMetaClient, MetadataPartitionType.FILES, 
false);
+    try {
+      if (!fs.exists(metadataTablePath)) {
+        return null;
+      }
+    } catch (FileNotFoundException e) {
+      // Ignoring exception as metadata table already does not exist
+      return null;
+    } catch (IOException e) {
+      throw new HoodieMetadataException("Failed to check metadata table 
existence", e);
+    }
+
+    if (backup) {
+      final Path metadataBackupPath = new Path(metadataTablePath.getParent(), 
".metadata_" + HoodieActiveTimeline.createNewInstantTime());
+      LOG.info("Backing up metadata directory to " + metadataBackupPath + " 
before deletion");
+      try {
+        if (fs.rename(metadataTablePath, metadataBackupPath)) {
+          return metadataBackupPath.toString();
+        }
+      } catch (Exception e) {
+        // If rename fails, we will ignore the backup and still delete the MDT
+        LOG.error("Failed to backup metadata table using rename", e);
+      }
+    }
+
+    LOG.info("Deleting metadata table from " + metadataTablePath);
+    try {
+      fs.delete(metadataTablePath, true);
+    } catch (Exception e) {
+      throw new HoodieMetadataException("Failed to delete metadata table from 
path " + metadataTablePath, e);
+    }
+
+    return null;
+  }
+
+  public static HoodieTableMetaClient 
setMetadataPartitionState(HoodieTableMetaClient dataMetaClient, 
MetadataPartitionType partitionType, boolean enabled) {
+    dataMetaClient.getTableConfig().setMetadataPartitionState(partitionType, 
enabled);
+    HoodieTableConfig.update(dataMetaClient.getFs(), new 
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
+    dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+    
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionEnabled(partitionType)
 == enabled,
+            "Metadata table state change should be persisted");
+
+    LOG.info(String.format("Metadata table %s partition %s has been %s", 
dataMetaClient.getBasePathV2(), partitionType,
+            enabled ? "enabled" : "disabled"));
+    return dataMetaClient;
+  }
+
+  /**
+   * Delete a partition within the metadata table.
+   * <p>
+   * This can be used to delete a partition so that it can be re-bootstrapped.
+   *
+   * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for 
which metadata table is to be deleted
+   * @param context        instance of {@code HoodieEngineContext}.
+   * @param backup         Whether metadata table should be backed up before 
deletion. If true, the table is backed up to the
+   *                       directory with name metadata_<current_timestamp>.
+   * @param partitionType  The partition to delete
+   * @return The backup directory if backup was requested, null otherwise
+   */
+  public static String deleteMetadataTablePartition(HoodieTableMetaClient 
dataMetaClient, HoodieEngineContext context,
+                                                    MetadataPartitionType 
partitionType, boolean backup) {
+    if (partitionType.equals(MetadataPartitionType.FILES)) {
+      return deleteMetadataTable(dataMetaClient, context, backup);
+    }
+
+    final Path metadataTablePartitionPath = new 
Path(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath()),
 partitionType.getPartitionPath());
+    FileSystem fs = FSUtils.getFs(metadataTablePartitionPath.toString(), 
context.getHadoopConf().get());
+    setMetadataPartitionState(dataMetaClient, partitionType, false);
+    try {
+      if (!fs.exists(metadataTablePartitionPath)) {
+        return null;
+      }
+    } catch (FileNotFoundException e) {
+      // Ignoring exception as metadata table already does not exist
+      LOG.debug("Metadata table partition " + partitionType + " not found at 
path " + metadataTablePartitionPath);
+      return null;
+    } catch (Exception e) {
+      throw new HoodieMetadataException("Failed to check metadata table 
partition existence", e);
+    }
+
+    if (backup) {
+      final Path metadataPartitionBackupPath = new 
Path(metadataTablePartitionPath.getParent().getParent(),
+              String.format(".metadata_%s_%s", 
partitionType.getPartitionPath(), HoodieActiveTimeline.createNewInstantTime()));
+      LOG.info(String.format("Backing up MDT partition %s to %s before 
deletion", partitionType, metadataPartitionBackupPath));
+      try {
+        if (fs.rename(metadataTablePartitionPath, 
metadataPartitionBackupPath)) {
+          return metadataPartitionBackupPath.toString();
+        }
+      } catch (Exception e) {
+        // If rename fails, we will try to delete the table instead
+        LOG.error(String.format("Failed to backup MDT partition %s using 
rename", partitionType), e);
+      }
+    } else {
+      LOG.info("Deleting metadata table partition from " + 
metadataTablePartitionPath);
+      try {
+        fs.delete(metadataTablePartitionPath, true);
+      } catch (Exception e) {
+        throw new HoodieMetadataException("Failed to delete metadata table 
partition from path " + metadataTablePartitionPath, e);
+      }
+    }
+
+    return null;
+  }
+
+  public static HoodieTableMetaClient 
setMetadataInflightPartitions(HoodieTableMetaClient dataMetaClient, 
List<MetadataPartitionType> partitionTypes) {
+    
dataMetaClient.getTableConfig().setMetadataPartitionsAsInflight(partitionTypes);
+    HoodieTableConfig.update(dataMetaClient.getFs(), new 
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
+    dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+
+    LOG.info(String.format("Metadata table %s partitions %s hasvebeen set to 
inflight", dataMetaClient.getBasePathV2(), partitionTypes));
+    return dataMetaClient;
+  }
+
+  /**
+   * Return the complete fileID for a file group within a MDT partition.
+   * <p>
+   * MDT fileGroups have the format <fileIDPrefix>-<index>. The fileIDPrefix 
is hardcoded for each MDT partition and index is an integer.
+   *
+   * @param partitionType The type of the MDT partition
+   * @param index         Index of the file group within the partition
+   * @return The fileID
+   */
+  public static String getFileIDForFileGroup(MetadataPartitionType 
partitionType, int index) {
+    return String.format("%s%04d", partitionType.getFileIdPrefix(), index);
+  }
+
+  /**
+   * Extract the index from the fileID of a file group in the MDT partition. 
See {@code getFileIDForFileGroup} for the format of the fileID.
+   *
+   * @param fileId fileID of a file group.
+   * @return The index of file group
+   */
+  public static int getFileGroupIndexFromFileId(String fileId) {

Review Comment:
   can we add UTs for these static methods



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1378,6 +1339,206 @@ public static Set<String> 
getInflightAndCompletedMetadataPartitions(HoodieTableC
    */
   public static boolean isIndexingCommit(String instantTime) {
     return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + 
METADATA_INDEXER_TIME_SUFFIX.length()
-        && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+            && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+  }
+
+  /**
+   * Delete the metadata table for the dataset and backup if required.
+   *
+   * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for 
which metadata table is to be deleted
+   * @param context        instance of {@link HoodieEngineContext}.
+   * @param backup         Whether metadata table should be backed up before 
deletion. If true, the table is backed up to the
+   *                       directory with name metadata_<current_timestamp>.
+   * @return The backup directory if backup was requested
+   */
+  public static String deleteMetadataTable(HoodieTableMetaClient 
dataMetaClient, HoodieEngineContext context, boolean backup) {
+    final Path metadataTablePath = 
HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePathV2());
+    FileSystem fs = FSUtils.getFs(metadataTablePath.toString(), 
context.getHadoopConf().get());
+    setMetadataPartitionState(dataMetaClient, MetadataPartitionType.FILES, 
false);
+    try {
+      if (!fs.exists(metadataTablePath)) {
+        return null;
+      }
+    } catch (FileNotFoundException e) {
+      // Ignoring exception as metadata table already does not exist
+      return null;
+    } catch (IOException e) {
+      throw new HoodieMetadataException("Failed to check metadata table 
existence", e);
+    }
+
+    if (backup) {
+      final Path metadataBackupPath = new Path(metadataTablePath.getParent(), 
".metadata_" + HoodieActiveTimeline.createNewInstantTime());
+      LOG.info("Backing up metadata directory to " + metadataBackupPath + " 
before deletion");
+      try {
+        if (fs.rename(metadataTablePath, metadataBackupPath)) {
+          return metadataBackupPath.toString();
+        }
+      } catch (Exception e) {
+        // If rename fails, we will ignore the backup and still delete the MDT
+        LOG.error("Failed to backup metadata table using rename", e);
+      }
+    }
+
+    LOG.info("Deleting metadata table from " + metadataTablePath);
+    try {
+      fs.delete(metadataTablePath, true);
+    } catch (Exception e) {
+      throw new HoodieMetadataException("Failed to delete metadata table from 
path " + metadataTablePath, e);
+    }
+
+    return null;
+  }
+
+  public static HoodieTableMetaClient 
setMetadataPartitionState(HoodieTableMetaClient dataMetaClient, 
MetadataPartitionType partitionType, boolean enabled) {
+    dataMetaClient.getTableConfig().setMetadataPartitionState(partitionType, 
enabled);
+    HoodieTableConfig.update(dataMetaClient.getFs(), new 
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
+    dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+    
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionEnabled(partitionType)
 == enabled,
+            "Metadata table state change should be persisted");
+
+    LOG.info(String.format("Metadata table %s partition %s has been %s", 
dataMetaClient.getBasePathV2(), partitionType,
+            enabled ? "enabled" : "disabled"));
+    return dataMetaClient;
+  }
+
+  /**
+   * Delete a partition within the metadata table.
+   * <p>
+   * This can be used to delete a partition so that it can be re-bootstrapped.
+   *
+   * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for 
which metadata table is to be deleted
+   * @param context        instance of {@code HoodieEngineContext}.
+   * @param backup         Whether metadata table should be backed up before 
deletion. If true, the table is backed up to the
+   *                       directory with name metadata_<current_timestamp>.
+   * @param partitionType  The partition to delete
+   * @return The backup directory if backup was requested, null otherwise
+   */
+  public static String deleteMetadataTablePartition(HoodieTableMetaClient 
dataMetaClient, HoodieEngineContext context,

Review Comment:
   may I know in which code path the backup is set to true. I could not triage 
it. every caller is setting it to false. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to