This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 221ddd9  [HUDI-2016] Fixed bootstrap of Metadata Table when some 
actions are in progress. (#3083)
221ddd9 is described below

commit 221ddd9bf3899e3672210404d51e686770ba446d
Author: Prashant Wason <[email protected]>
AuthorDate: Tue Jul 6 08:08:46 2021 -0700

    [HUDI-2016] Fixed bootstrap of Metadata Table when some actions are in 
progress. (#3083)
    
    Metadata Table cannot be bootstrapped when any action is in progress. This 
is detected by the presence of inflight or requested instants. The 
bootstrapping is initiated in preWrite and postWrite of each commit. So 
bootstrapping will be retried again until it succeeds.
    Also added metrics for when the bootstrapping fails or a table is 
re-bootstrapped. This will help detect tables which are not getting 
bootstrapped.
---
 .../metadata/HoodieBackedTableMetadataWriter.java  | 33 ++++++-----
 .../hudi/metadata/TestHoodieBackedMetadata.java    | 64 ++++++++++++++--------
 .../hudi/metadata/HoodieMetadataMetrics.java       |  2 +
 3 files changed, 61 insertions(+), 38 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index b81e00b..6a40d62 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -250,13 +250,14 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
         rebootstrap = true;
       } else if 
(datasetMetaClient.getActiveTimeline().isBeforeTimelineStarts(latestMetadataInstant.get().getTimestamp()))
 {
         LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced 
instants have been archived."
-            + "latestMetadataInstant=" + 
latestMetadataInstant.get().getTimestamp()
+            + " latestMetadataInstant=" + 
latestMetadataInstant.get().getTimestamp()
             + ", latestDatasetInstant=" + 
datasetMetaClient.getActiveTimeline().firstInstant().get().getTimestamp());
         rebootstrap = true;
       }
     }
 
     if (rebootstrap) {
+      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.REBOOTSTRAP_STR, 1));
       LOG.info("Deleting Metadata Table directory so that it can be 
re-bootstrapped");
       datasetMetaClient.getFs().delete(new 
Path(metadataWriteConfig.getBasePath()), true);
       exists = false;
@@ -264,8 +265,9 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
 
     if (!exists) {
       // Initialize for the first time by listing partitions and files 
directly from the file system
-      bootstrapFromFilesystem(engineContext, datasetMetaClient);
-      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+      if (bootstrapFromFilesystem(engineContext, datasetMetaClient)) {
+        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+      }
     }
   }
 
@@ -274,22 +276,22 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
    *
    * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
    */
-  private void bootstrapFromFilesystem(HoodieEngineContext engineContext, 
HoodieTableMetaClient datasetMetaClient) throws IOException {
+  private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, 
HoodieTableMetaClient datasetMetaClient) throws IOException {
     ValidationUtils.checkState(enabled, "Metadata table cannot be initialized 
as it is not enabled");
 
-    // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP 
as the instant time for initial commit
-    // Otherwise, we use the timestamp of the instant which does not have any 
non-completed instants before it.
-    Option<HoodieInstant> latestInstant = Option.empty();
-    boolean foundNonComplete = false;
-    for (HoodieInstant instant : 
datasetMetaClient.getActiveTimeline().getInstants().collect(Collectors.toList()))
 {
-      if (!instant.isCompleted()) {
-        foundNonComplete = true;
-      } else if (!foundNonComplete) {
-        latestInstant = Option.of(instant);
-      }
+    // We can only bootstrap if there are no pending operations on the dataset
+    Option<HoodieInstant> pendingInstantOption = 
Option.fromJavaOptional(datasetMetaClient.getActiveTimeline()
+        .getReverseOrderedInstants().filter(i -> 
!i.isCompleted()).findFirst());
+    if (pendingInstantOption.isPresent()) {
+      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
+      LOG.warn("Cannot bootstrap metadata table as operation is in progress: " 
+ pendingInstantOption.get());
+      return false;
     }
 
-    String createInstantTime = 
latestInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+    // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP 
as the instant time for initial commit
+    // Otherwise, we use the latest commit timestamp.
+    String createInstantTime = 
datasetMetaClient.getActiveTimeline().getReverseOrderedInstants().findFirst()
+        .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
     LOG.info("Creating a new metadata table in " + 
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
 
     HoodieTableMetaClient.withPropertyBuilder()
@@ -335,6 +337,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
 
     LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " 
+ stats[0] + " files to metadata");
     update(commitMetadata, createInstantTime);
+    return true;
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
index 3ae722f..0d0ca1e 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
@@ -109,10 +109,10 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
   }
 
   /**
-   * Metadata Table should not be created unless it is enabled in config.
+   * Metadata Table bootstrap scenarios.
    */
   @Test
-  public void testDefaultNoMetadataTable() throws Exception {
+  public void testMetadataTableBootstrap() throws Exception {
     init(HoodieTableType.COPY_ON_WRITE);
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
 
@@ -121,46 +121,63 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
     assertThrows(TableNotFoundException.class, () -> 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build());
 
     // Metadata table is not created if disabled by config
+    String firstCommitTime = HoodieActiveTimeline.createNewInstantTime();
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, false))) {
-      client.startCommitWithTime("001");
-      client.insert(jsc.emptyRDD(), "001");
+      client.startCommitWithTime(firstCommitTime);
+      client.insert(jsc.parallelize(dataGen.generateInserts(firstCommitTime, 
5)), firstCommitTime);
       assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table 
should not be created");
       assertThrows(TableNotFoundException.class, () -> 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build());
     }
 
+    // Metadata table should not be created if any non-complete instants are 
present
+    String secondCommitTime = HoodieActiveTimeline.createNewInstantTime();
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(false, true), true)) {
+      client.startCommitWithTime(secondCommitTime);
+      client.insert(jsc.parallelize(dataGen.generateUpdates(secondCommitTime, 
2)), secondCommitTime);
+      // AutoCommit is false so no bootstrap
+      client.syncTableMetadata();
+      assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table 
should not be created");
+      assertThrows(TableNotFoundException.class, () -> 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build());
+      // rollback this commit
+      client.rollback(secondCommitTime);
+    }
+
     // Metadata table created when enabled by config & sync is called
+    secondCommitTime = HoodieActiveTimeline.createNewInstantTime();
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true), true)) {
-      client.startCommitWithTime("002");
-      client.insert(jsc.emptyRDD(), "002");
+      client.startCommitWithTime(secondCommitTime);
+      client.insert(jsc.parallelize(dataGen.generateUpdates(secondCommitTime, 
2)), secondCommitTime);
       client.syncTableMetadata();
       assertTrue(fs.exists(new Path(metadataTableBasePath)));
       validateMetadata(client);
     }
 
-    // Delete the 001  and 002 instants and introduce a 003. This should 
trigger a rebootstrap of the metadata
-    // table as un-synched instants have been "archived".
-    // Metadata Table should not have 001 and 002 delta-commits as it was 
re-bootstrapped
+    // Delete all existing instants on dataset to simulate archiving. This 
should trigger a re-bootstrap of the metadata
+    // table as last synched instant has been "archived".
     final String metadataTableMetaPath = metadataTableBasePath + 
Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME;
-    assertTrue(fs.exists(new Path(metadataTableMetaPath, 
HoodieTimeline.makeDeltaFileName("001"))));
-    assertTrue(fs.exists(new Path(metadataTableMetaPath, 
HoodieTimeline.makeDeltaFileName("002"))));
-    Arrays.stream(fs.globStatus(new Path(metaClient.getMetaPath(), 
"{001,002}.*"))).forEach(s -> {
-      try {
-        fs.delete(s.getPath(), false);
-      } catch (IOException e) {
-        LOG.warn("Error when deleting instant " + s + ": " + e);
-      }
-    });
+    assertTrue(fs.exists(new Path(metadataTableMetaPath, 
HoodieTimeline.makeDeltaFileName(secondCommitTime))));
+
+    Arrays.stream(fs.listStatus(new 
Path(metaClient.getMetaPath()))).filter(status -> 
status.getPath().getName().matches("^\\d+\\..*"))
+        .forEach(status -> {
+          try {
+            fs.delete(status.getPath(), false);
+          } catch (IOException e) {
+            LOG.warn("Error when deleting instant " + status + ": " + e);
+          }
+        });
 
+    String thirdCommitTime = HoodieActiveTimeline.createNewInstantTime();
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true), true)) {
-      client.startCommitWithTime("003");
-      client.insert(jsc.emptyRDD(), "003");
+      client.startCommitWithTime(thirdCommitTime);
+      client.insert(jsc.parallelize(dataGen.generateUpdates(thirdCommitTime, 
2)), thirdCommitTime);
       client.syncTableMetadata();
       assertTrue(fs.exists(new Path(metadataTableBasePath)));
       validateMetadata(client);
 
-      // Metadata Table should not have 001 and 002 delta-commits as it was 
re-bootstrapped
-      assertFalse(fs.exists(new Path(metadataTableMetaPath, 
HoodieTimeline.makeDeltaFileName("001"))));
-      assertFalse(fs.exists(new Path(metadataTableMetaPath, 
HoodieTimeline.makeDeltaFileName("002"))));
+      // Metadata Table should not have previous delta-commits as it was 
re-bootstrapped
+      assertFalse(fs.exists(new Path(metadataTableMetaPath, 
HoodieTimeline.makeDeltaFileName(firstCommitTime))));
+      assertFalse(fs.exists(new Path(metadataTableMetaPath, 
HoodieTimeline.makeDeltaFileName(secondCommitTime))));
+      assertTrue(fs.exists(new Path(metadataTableMetaPath, 
HoodieTimeline.makeDeltaFileName(thirdCommitTime))));
     }
   }
 
@@ -195,6 +212,7 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
         
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withDirectoryFilterRegex(filterDirRegex).build()).build();
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
       client.startCommitWithTime("005");
+      client.insert(jsc.emptyRDD(), "005");
 
       List<String> partitions = 
metadataWriter(client).metadata().getAllPartitionPaths();
       assertFalse(partitions.contains(nonPartitionDirectory),
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
index 2bd773b..9f1b0a0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
@@ -48,6 +48,8 @@ public class HoodieMetadataMetrics implements Serializable {
   public static final String BASEFILE_READ_STR = "basefile_read";
   public static final String INITIALIZE_STR = "initialize";
   public static final String SYNC_STR = "sync";
+  public static final String REBOOTSTRAP_STR = "rebootstrap";
+  public static final String BOOTSTRAP_ERR_STR = "bootstrap_error";
 
   // Stats names
   public static final String STAT_TOTAL_BASE_FILE_SIZE = 
"totalBaseFileSizeInBytes";

Reply via email to