This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 48f89aeb1c7 [HUDI-6397][HUDI-6759] Fixing misc bugs w/ metadata table 
(#9546)
48f89aeb1c7 is described below

commit 48f89aeb1c732d1ba1a99832b908d99558ac7c9c
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Sep 6 13:56:21 2023 -0400

    [HUDI-6397][HUDI-6759] Fixing misc bugs w/ metadata table (#9546)
    
    1. This commit allows users to disable metadata using write configs cleanly.
    2. Valid instants consideration while reading from MDT is solid now. We are 
going to treat any special instant time (that has additional suffix compared to 
DT's commit time) as valid.
    
    Especially with MDT partition initialization, the suffix is dynamic, and so 
we can't really find exact match. So, might have to go with total instant time 
length and treat all special instant times as valid ones.
    
    In the LogRecordReader, we will first ignore any uncommitted instants. And 
then if it's completed in MDT timeline, we check w/ the instantRange. So it 
should be fine to return true for any special instant times.
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |  2 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |  6 +----
 .../org/apache/hudi/table/HoodieSparkTable.java    |  3 ++-
 .../functional/TestHoodieBackedMetadata.java       | 28 ++++++++++++++++++----
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  1 +
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 11 +++++----
 .../sink/TestStreamWriteOperatorCoordinator.java   |  9 +++----
 7 files changed, 40 insertions(+), 20 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 460bfa2c6e2..8a930ba5972 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -172,7 +172,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
 
     this.dataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
 
-    if (dataMetaClient.getTableConfig().isMetadataTableAvailable() || 
writeConfig.isMetadataTableEnabled()) {
+    if (writeConfig.isMetadataTableEnabled()) {
       this.metadataWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig, 
failedWritesCleaningPolicy);
 
       try {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index f1de637edf5..101931f8c76 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -1003,12 +1003,8 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
     // Only execute metadata table deletion when all the following conditions 
are met
     // (1) This is data table
     // (2) Metadata table is disabled in HoodieWriteConfig for the writer
-    // (3) Check `HoodieTableConfig.TABLE_METADATA_PARTITIONS`.  Either the 
table config
-    // does not exist, or the table config is non-empty indicating that 
metadata table
-    // partitions are ready to use
     return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
-        && !config.isMetadataTableEnabled()
-        && !metaClient.getTableConfig().getMetadataPartitions().isEmpty();
+        && !config.isMetadataTableEnabled();
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index a5202fb7bbe..111b254634b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -91,7 +91,7 @@ public abstract class HoodieSparkTable<T>
   protected Option<HoodieTableMetadataWriter> getMetadataWriter(
       String triggeringInstantTimestamp,
       HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) {
-    if (config.isMetadataTableEnabled() || 
metaClient.getTableConfig().isMetadataTableAvailable()) {
+    if (config.isMetadataTableEnabled()) {
       // if any partition is deleted, we need to reload the metadata table 
writer so that new table configs are picked up
       // to reflect the delete mdt partitions.
       deleteMetadataIndexIfNecessary();
@@ -112,6 +112,7 @@ public abstract class HoodieSparkTable<T>
         throw new HoodieMetadataException("Checking existence of metadata 
table failed", e);
       }
     } else {
+      // if metadata is not enabled in the write config, we should try and 
delete it (if present)
       maybeDeleteMetadataTable();
     }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 26dc41f73a3..6f6c4b65b11 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -270,7 +270,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       validateMetadata(client);
     }
     // check table config
-    HoodieTableMetaClient.reload(metaClient);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTableConfig tableConfig = metaClient.getTableConfig();
     assertFalse(tableConfig.getMetadataPartitions().isEmpty());
     
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
@@ -295,7 +295,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       validateMetadata(client);
     }
     // check table config
-    HoodieTableMetaClient.reload(metaClient);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     tableConfig = metaClient.getTableConfig();
     assertFalse(tableConfig.getMetadataPartitions().isEmpty());
     
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
@@ -321,7 +321,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       validateMetadata(client);
     }
     // check table config
-    HoodieTableMetaClient.reload(metaClient);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     tableConfig = metaClient.getTableConfig();
     assertFalse(tableConfig.getMetadataPartitions().isEmpty());
     
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
@@ -347,15 +347,33 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       validateMetadata(client);
     }
     // check table config
-    HoodieTableMetaClient.reload(metaClient);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     tableConfig = metaClient.getTableConfig();
     assertFalse(tableConfig.getMetadataPartitions().isEmpty());
     
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
     
assertTrue(tableConfig.getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));
     
assertTrue(tableConfig.getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath()));
+
+    // disable entire MDT and validate its deleted
+    HoodieWriteConfig cfgWithMetadataDisabled = 
getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, 
HoodieFailedWritesCleaningPolicy.EAGER)
+        .withParallelism(1, 
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .build();
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
cfgWithMetadataDisabled)) {
+      // Upsert
+      String commitTime = "0000006";
+      client.startCommitWithTime(commitTime);
+      List<HoodieRecord> records = dataGen.generateUniqueUpdates(commitTime, 
10);
+      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 
1), commitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+    }
+
+    // check table config
+    tableConfig = HoodieTableMetaClient.reload(metaClient).getTableConfig();
+    assertTrue(tableConfig.getMetadataPartitions().isEmpty());
   }
 
-  @Disabled("HUDI-6397")
   @Test
   public void testTurnOffMetadataTableAfterEnable() throws Exception {
     init(COPY_ON_WRITE, true);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 373945975be..d0ec7f020ab 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -605,6 +605,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     dataMetaClient.reloadActiveTimeline();
     if (metadataMetaClient != null) {
       metadataMetaClient.reloadActiveTimeline();
+      metadataFileSystemView.close();
       metadataFileSystemView = getFileSystemView(metadataMetaClient);
     }
     // the cached reader has max instant time restriction, they should be 
cleared
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 861f8fc8ddd..9367b7b0a07 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -153,6 +153,8 @@ public class HoodieTableMetadataUtil {
   // This suffix and all after that are used for initialization of the various 
partitions. The unused suffixes lower than this value
   // are reserved for future operations on the MDT.
   private static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; // 
corresponds to "010";
+  // we have max of 4 partitions (FILES, COL_STATS, BLOOM, RLI)
+  private static final List<String> 
VALID_PARTITION_INITIALIZATION_TIME_SUFFIXES = 
Arrays.asList("010","011","012","013");
 
   /**
    * Returns whether the files partition of metadata table is ready for read.
@@ -1282,13 +1284,14 @@ public class HoodieTableMetadataUtil {
           validInstantTimestamps.addAll(getRollbackedCommits(instant, 
datasetTimeline));
         });
 
-    // add restore instants from MDT.
+    // add restore and rollback instants from MDT.
     
metadataMetaClient.getActiveTimeline().getRollbackAndRestoreTimeline().filterCompletedInstants()
-        .filter(instant -> 
instant.getAction().equals(HoodieTimeline.RESTORE_ACTION))
+        .filter(instant -> 
instant.getAction().equals(HoodieTimeline.RESTORE_ACTION) || 
instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION))
         .getInstants().forEach(instant -> 
validInstantTimestamps.add(instant.getTimestamp()));
 
-    // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid 
timestamp
-    validInstantTimestamps.add(createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP, 
PARTITION_INITIALIZATION_TIME_SUFFIX));
+    
metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
+        .filter(instant ->  
instant.getTimestamp().startsWith(SOLO_COMMIT_TIMESTAMP))
+        .getInstants().forEach(instant -> 
validInstantTimestamps.add(instant.getTimestamp()));
     return validInstantTimestamps;
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index ee2f50cb20c..9e979a9fbd0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -322,19 +322,20 @@ public class TestStreamWriteOperatorCoordinator {
     assertThat(completedTimeline.lastInstant().get().getTimestamp(), 
startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
 
     // test metadata table log compaction
-    // write another 5 commits
-    for (int i = 1; i < 6; i++) {
+    // already 1 commit is used to initialized FILES partition in MDT
+    // write another 4 commits
+    for (int i = 1; i < 5; i++) {
       instant = mockWriteWithMetadata();
       metadataTableMetaClient.reloadActiveTimeline();
       completedTimeline = 
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
       assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(i + 1));
       assertThat(completedTimeline.lastInstant().get().getTimestamp(), 
is(instant));
     }
-    // the 6th commit triggers the log compaction
+    // the 5th commit triggers the log compaction
     mockWriteWithMetadata();
     metadataTableMetaClient.reloadActiveTimeline();
     completedTimeline = 
metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
-    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(8));
+    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(7));
     assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), 
is(instant + "005"));
     // log compaction is another delta commit
     assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), 
is(HoodieTimeline.DELTA_COMMIT_ACTION));

Reply via email to