prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192169835


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1378,6 +1339,206 @@ public static Set<String> 
getInflightAndCompletedMetadataPartitions(HoodieTableC
    */
   public static boolean isIndexingCommit(String instantTime) {
     return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + 
METADATA_INDEXER_TIME_SUFFIX.length()
-        && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+            && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+  }
+
+  /**
+   * Delete the metadata table for the dataset and backup if required.
+   *
+   * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for 
which metadata table is to be deleted
+   * @param context        instance of {@link HoodieEngineContext}.
+   * @param backup         Whether metadata table should be backed up before 
deletion. If true, the table is backed up to the
+   *                       directory with name metadata_<current_timestamp>.
+   * @return The backup directory if backup was requested
+   */
+  public static String deleteMetadataTable(HoodieTableMetaClient 
dataMetaClient, HoodieEngineContext context, boolean backup) {
+    final Path metadataTablePath = 
HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePathV2());
+    FileSystem fs = FSUtils.getFs(metadataTablePath.toString(), 
context.getHadoopConf().get());
+    setMetadataPartitionState(dataMetaClient, MetadataPartitionType.FILES, 
false);
+    try {
+      if (!fs.exists(metadataTablePath)) {
+        return null;
+      }
+    } catch (FileNotFoundException e) {
+      // Ignoring exception as metadata table already does not exist
+      return null;
+    } catch (IOException e) {
+      throw new HoodieMetadataException("Failed to check metadata table 
existence", e);
+    }
+
+    if (backup) {
+      final Path metadataBackupPath = new Path(metadataTablePath.getParent(), 
".metadata_" + HoodieActiveTimeline.createNewInstantTime());
+      LOG.info("Backing up metadata directory to " + metadataBackupPath + " 
before deletion");
+      try {
+        if (fs.rename(metadataTablePath, metadataBackupPath)) {
+          return metadataBackupPath.toString();
+        }
+      } catch (Exception e) {
+        // If rename fails, we will ignore the backup and still delete the MDT
+        LOG.error("Failed to backup metadata table using rename", e);
+      }
+    }
+
+    LOG.info("Deleting metadata table from " + metadataTablePath);
+    try {
+      fs.delete(metadataTablePath, true);
+    } catch (Exception e) {
+      throw new HoodieMetadataException("Failed to delete metadata table from 
path " + metadataTablePath, e);
+    }
+
+    return null;
+  }
+
+  public static HoodieTableMetaClient 
setMetadataPartitionState(HoodieTableMetaClient dataMetaClient, 
MetadataPartitionType partitionType, boolean enabled) {
+    dataMetaClient.getTableConfig().setMetadataPartitionState(partitionType, 
enabled);
+    HoodieTableConfig.update(dataMetaClient.getFs(), new 
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
+    dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+    
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionEnabled(partitionType)
 == enabled,
+            "Metadata table state change should be persisted");
+
+    LOG.info(String.format("Metadata table %s partition %s has been %s", 
dataMetaClient.getBasePathV2(), partitionType,
+            enabled ? "enabled" : "disabled"));
+    return dataMetaClient;
+  }
+
+  /**
+   * Delete a partition within the metadata table.
+   * <p>
+   * This can be used to delete a partition so that it can be re-bootstrapped.
+   *
+   * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for 
which metadata table is to be deleted
+   * @param context        instance of {@code HoodieEngineContext}.
+   * @param backup         Whether metadata table should be backed up before 
deletion. If true, the table is backed up to the
+   *                       directory with name metadata_<current_timestamp>.
+   * @param partitionType  The partition to delete
+   * @return The backup directory if backup was requested, null otherwise
+   */
+  public static String deleteMetadataTablePartition(HoodieTableMetaClient 
dataMetaClient, HoodieEngineContext context,
+                                                    MetadataPartitionType 
partitionType, boolean backup) {
+    if (partitionType.equals(MetadataPartitionType.FILES)) {
+      return deleteMetadataTable(dataMetaClient, context, backup);
+    }
+
+    final Path metadataTablePartitionPath = new 
Path(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath()),
 partitionType.getPartitionPath());
+    FileSystem fs = FSUtils.getFs(metadataTablePartitionPath.toString(), 
context.getHadoopConf().get());
+    setMetadataPartitionState(dataMetaClient, partitionType, false);
+    try {
+      if (!fs.exists(metadataTablePartitionPath)) {
+        return null;
+      }
+    } catch (FileNotFoundException e) {
+      // Ignoring exception as metadata table already does not exist
+      LOG.debug("Metadata table partition " + partitionType + " not found at 
path " + metadataTablePartitionPath);
+      return null;
+    } catch (Exception e) {
+      throw new HoodieMetadataException("Failed to check metadata table 
partition existence", e);

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to