nsivabalan commented on a change in pull request #5169:
URL: https://github.com/apache/hudi/pull/5169#discussion_r838931497



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
##########
@@ -60,4 +62,12 @@
    */
   void update(HoodieRollbackMetadata rollbackMetadata, String instantTime);
 
+  /**
+   * Drop the given metadata partitions. This path reuses DELETE_PARTITION 
operation.
+   *
+   * @param instantTime - instant time when replacecommit corresponding to the 
drop will be recorded in the metadata timeline
+   * @param partitions - list of {@link MetadataPartitionType} to drop
+   * @throws IOException
+   */
+  void dropPartitions(String instantTime, List<MetadataPartitionType> 
partitions);

Review comment:
       can we maintain the same name as writeClient. deletePartitions instead 
of dropPartitions. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -177,4 +181,16 @@ protected void commit(String instantTime, 
Map<MetadataPartitionType, HoodieData<
     // Update total size of the metadata and count of base/log files
     metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata));
   }
+
+  @Override
+  public void dropPartitions(String instantTime, List<MetadataPartitionType> 
partitions) {
+    List<String> partitionsToDrop = 
partitions.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList());
+    LOG.warn("Deleting Metadata Table partitions: " + partitionsToDrop);

Review comment:
       we could do INFO

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -445,6 +445,79 @@ private void testTableOperationsForMetaIndexImpl(final 
HoodieWriteConfig writeCo
     testTableOperationsImpl(engineContext, writeConfig);
   }
 
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testMetadataTableDeletePartition(HoodieTableType tableType) 
throws IOException {
+    initPath();
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+        .withIndexConfig(HoodieIndexConfig.newBuilder()
+            .bloomIndexBucketizedChecking(false)
+            .build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withMetadataIndexBloomFilter(true)
+            .withMetadataIndexBloomFilterFileGroups(4)
+            .withMetadataIndexColumnStats(true)
+            .withMetadataIndexBloomFilterFileGroups(2)
+            .withMetadataIndexForAllColumns(true)
+            .build())
+        .build();
+    init(tableType, writeConfig);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      // Write 1 (Bulk insert)
+      String newCommitTime = "0000001";
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+      client.startCommitWithTime(newCommitTime);
+      List<WriteStatus> writeStatuses = 
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+
+      // Write 2 (upserts)
+      newCommitTime = "0000002";
+      client.startCommitWithTime(newCommitTime);
+      validateMetadata(client);
+
+      records = dataGen.generateInserts(newCommitTime, 10);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);

Review comment:
       we can remove validateMetadata here. don't think we to add these 
validations at every step of the way. we need to revisit all these validations 
and remove some of them. It just increases the total run time. 

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -445,6 +445,79 @@ private void testTableOperationsForMetaIndexImpl(final 
HoodieWriteConfig writeCo
     testTableOperationsImpl(engineContext, writeConfig);
   }
 
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testMetadataTableDeletePartition(HoodieTableType tableType) 
throws IOException {
+    initPath();
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+        .withIndexConfig(HoodieIndexConfig.newBuilder()
+            .bloomIndexBucketizedChecking(false)
+            .build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withMetadataIndexBloomFilter(true)
+            .withMetadataIndexBloomFilterFileGroups(4)
+            .withMetadataIndexColumnStats(true)
+            .withMetadataIndexBloomFilterFileGroups(2)
+            .withMetadataIndexForAllColumns(true)
+            .build())
+        .build();
+    init(tableType, writeConfig);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      // Write 1 (Bulk insert)
+      String newCommitTime = "0000001";
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+      client.startCommitWithTime(newCommitTime);
+      List<WriteStatus> writeStatuses = 
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+
+      // Write 2 (upserts)
+      newCommitTime = "0000002";
+      client.startCommitWithTime(newCommitTime);
+      validateMetadata(client);
+
+      records = dataGen.generateInserts(newCommitTime, 10);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+
+      // metadata writer to delete column_stats partition
+      HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
+      assertNotNull(metadataWriter, "MetadataWriter should have been 
initialized");
+      metadataWriter.dropPartitions("0000003", 
Arrays.asList(MetadataPartitionType.COLUMN_STATS));
+
+      HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
+      List<String> metadataTablePartitions = 
FSUtils.getAllPartitionPaths(engineContext, metadataMetaClient.getBasePath(), 
false, false);
+      // partition should still be physically present
+      assertEquals(metadataWriter.getEnabledPartitionTypes().size(), 
metadataTablePartitions.size());
+      
assertTrue(metadataTablePartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
+
+      Option<HoodieInstant> completedReplaceInstant = 
metadataMetaClient.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant();
+      assertTrue(completedReplaceInstant.isPresent());
+      assertEquals("0000003", completedReplaceInstant.get().getTimestamp());
+
+      final Map<String, MetadataPartitionType> metadataEnabledPartitionTypes = 
new HashMap<>();

Review comment:
       once we land the other lazy delete partitions Patch, we can trigger a 
clean here and verify that partition itself is physically deleted. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to