This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 11e64b2  [HUDI-1717] Metadata Reader should merge all the un-synced 
but complete instants from the dataset timeline. (#3082)
11e64b2 is described below

commit 11e64b2db0ddf8f816561f8442b373de15a26d71
Author: Prashant Wason <[email protected]>
AuthorDate: Tue Jun 22 08:52:18 2021 -0700

    [HUDI-1717] Metadata Reader should merge all the un-synced but complete 
instants from the dataset timeline. (#3082)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |  4 +--
 .../hudi/metadata/HoodieTableMetadataWriter.java   |  6 ++++
 .../SparkHoodieBackedTableMetadataWriter.java      | 19 +++++++++++
 .../hudi/metadata/TestHoodieBackedMetadata.java    | 38 ++++++++++++++++++++++
 .../apache/hudi/metadata/BaseTableMetadata.java    | 34 +++++++++++++++++--
 .../hudi/metadata/HoodieBackedTableMetadata.java   | 35 ++++++++++----------
 6 files changed, 114 insertions(+), 22 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 38a0447..034465d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -400,7 +400,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     // (re) init the metadata for reading.
     initTableMetadata();
     try {
-      List<HoodieInstant> instantsToSync = metadata.findInstantsToSync();
+      List<HoodieInstant> instantsToSync = 
metadata.findInstantsToSyncForWriter();
       if (instantsToSync.isEmpty()) {
         return;
       }
@@ -411,7 +411,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
       for (HoodieInstant instant : instantsToSync) {
         LOG.info("Syncing instant " + instant + " to metadata table");
 
-        Option<List<HoodieRecord>> records = 
HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant, 
metadata.getSyncedInstantTime());
+        Option<List<HoodieRecord>> records = 
HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant, 
getLatestSyncedInstantTime());
         if (records.isPresent()) {
           commit(records.get(), MetadataPartitionType.FILES.partitionPath(), 
instant.getTimestamp());
         }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
index 02c5b9e..1b02a3b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.util.Option;
 
 import java.io.Serializable;
 
@@ -40,4 +41,9 @@ public interface HoodieTableMetadataWriter extends 
Serializable, AutoCloseable {
   void update(HoodieRestoreMetadata restoreMetadata, String instantTime);
 
   void update(HoodieRollbackMetadata rollbackMetadata, String instantTime);
+
+  /**
+   * Return the timestamp of the latest instant synced to the metadata table.
+   */
+  Option<String> getLatestSyncedInstantTime();
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 7c12a9e..c014e8b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -29,6 +29,8 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -133,6 +135,23 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
   }
 
   /**
+   * Return the timestamp of the latest instant synced.
+   *
+   * To sync a instant on dataset, we create a corresponding delta-commit on 
the metadata table. So return the latest
+   * delta-commit.
+   */
+  @Override
+  public Option<String> getLatestSyncedInstantTime() {
+    if (!enabled) {
+      return Option.empty();
+    }
+
+    HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
+    return timeline.getDeltaCommitTimeline().filterCompletedInstants()
+        .lastInstant().map(HoodieInstant::getTimestamp);
+  }
+
+  /**
    * Tag each record with the location.
    *
    * Since we only read the latest base file in a partition, we tag the 
records with the instant time of the latest
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
index ab731f5..79ccd37 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
@@ -491,6 +491,8 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
 
     // Various table operations without metadata table enabled
     String restoreToInstant;
+    String inflightActionTimestamp;
+    String beforeInflightActionTimestamp;
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, false))) {
       // updates
       newCommitTime = HoodieActiveTimeline.createNewInstantTime();
@@ -523,6 +525,10 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
         assertTrue(metadata(client).isInSync());
       }
 
+      // Record a timestamp for creating an inflight instance for sync testing
+      inflightActionTimestamp = HoodieActiveTimeline.createNewInstantTime();
+      beforeInflightActionTimestamp = newCommitTime;
+
       // Deletes
       newCommitTime = HoodieActiveTimeline.createNewInstantTime();
       records = dataGen.generateDeletes(newCommitTime, 5);
@@ -554,9 +560,41 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
       assertTrue(metadata(client).isInSync());
     }
 
+    // If there is an incomplete operation, the Metadata Table is not updated 
beyond that operations but the
+    // in-memory merge should consider all the completed operations.
+    Path inflightCleanPath = new Path(metaClient.getMetaPath(), 
HoodieTimeline.makeInflightCleanerFileName(inflightActionTimestamp));
+    fs.create(inflightCleanPath).close();
+
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true))) {
       // Restore cannot be done until the metadata table is in sync. See 
HUDI-1502 for details
       client.syncTableMetadata();
+
+      // Table should sync only before the inflightActionTimestamp
+      HoodieBackedTableMetadataWriter writer =
+          
(HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf,
 client.getConfig(), context);
+      assertEquals(writer.getLatestSyncedInstantTime().get(), 
beforeInflightActionTimestamp);
+
+      // Reader should sync to all the completed instants
+      HoodieTableMetadata metadata  = HoodieTableMetadata.create(context, 
client.getConfig().getMetadataConfig(),
+          client.getConfig().getBasePath(), 
FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR);
+      assertEquals(metadata.getSyncedInstantTime().get(), newCommitTime);
+
+      // Remove the inflight instance holding back table sync
+      fs.delete(inflightCleanPath, false);
+      client.syncTableMetadata();
+
+      writer =
+          
(HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf,
 client.getConfig(), context);
+      assertEquals(writer.getLatestSyncedInstantTime().get(), newCommitTime);
+
+      // Reader should sync to all the completed instants
+      metadata  = HoodieTableMetadata.create(context, 
client.getConfig().getMetadataConfig(),
+          client.getConfig().getBasePath(), 
FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR);
+      assertEquals(metadata.getSyncedInstantTime().get(), newCommitTime);
+    }
+
+    // Enable metadata table and ensure it is synced
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true))) {
       client.restoreToInstant(restoreToInstant);
       assertFalse(metadata(client).isInSync());
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 0092853..85a4d69 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -62,6 +62,7 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
   protected final HoodieMetadataConfig metadataConfig;
   // Directory used for Spillable Map when merging records
   protected final String spillableMapDirectory;
+  private String syncedInstantTime;
 
   protected boolean enabled;
   private TimelineMergedTableMetadata timelineMergedMetadata;
@@ -277,17 +278,44 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
 
   private void openTimelineScanner() {
     if (timelineMergedMetadata == null) {
-      List<HoodieInstant> unSyncedInstants = findInstantsToSync();
+      List<HoodieInstant> unSyncedInstants = findInstantsToSyncForReader();
       timelineMergedMetadata =
           new TimelineMergedTableMetadata(datasetMetaClient, unSyncedInstants, 
getSyncedInstantTime(), null);
+
+      syncedInstantTime = unSyncedInstants.isEmpty() ? 
getLatestDatasetInstantTime()
+          : unSyncedInstants.get(unSyncedInstants.size() - 1).getTimestamp();
+    }
+  }
+
+  /**
+   * Return the timestamp of the latest synced instant.
+   */
+  @Override
+  public Option<String> getSyncedInstantTime() {
+    if (!enabled) {
+      return Option.empty();
     }
+
+    return Option.ofNullable(syncedInstantTime);
   }
 
-  protected abstract List<HoodieInstant> findInstantsToSync();
+  /**
+   * Return the instants which are not-synced to the {@code 
HoodieTableMetadata}.
+   *
+   * This is the list of all completed but un-synched instants.
+   */
+  protected abstract List<HoodieInstant> findInstantsToSyncForReader();
+
+  /**
+   * Return the instants which are not-synced to the {@code 
HoodieTableMetadataWriter}.
+   *
+   * This is the list of all completed but un-synched instants which do not 
have any incomplete instants in between them.
+   */
+  protected abstract List<HoodieInstant> findInstantsToSyncForWriter();
 
   @Override
   public boolean isInSync() {
-    return enabled && findInstantsToSync().isEmpty();
+    return enabled && findInstantsToSyncForWriter().isEmpty();
   }
 
   protected HoodieEngineContext getEngineContext() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 66e8f41..f374d61 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -265,7 +264,22 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
    * Return an ordered list of instants which have not been synced to the 
Metadata Table.
    */
   @Override
-  protected List<HoodieInstant> findInstantsToSync() {
+  protected List<HoodieInstant> findInstantsToSyncForReader() {
+    return findInstantsToSync(true);
+  }
+
+  /**
+   * Return an ordered list of instants which have not been synced to the 
Metadata Table.
+   */
+  @Override
+  protected List<HoodieInstant> findInstantsToSyncForWriter() {
+    return findInstantsToSync(false);
+  }
+
+  /**
+   * Return an ordered list of instants which have not been synced to the 
Metadata Table.
+   */
+  private List<HoodieInstant> findInstantsToSync(boolean 
ignoreIncompleteInstants) {
     initIfNeeded();
 
     // if there are no instants yet, return empty list, since there is nothing 
to sync here.
@@ -277,7 +291,8 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     // are candidates for sync.
     String latestMetadataInstantTime = 
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp();
     HoodieDefaultTimeline candidateTimeline = 
datasetMetaClient.getActiveTimeline().findInstantsAfter(latestMetadataInstantTime,
 Integer.MAX_VALUE);
-    Option<HoodieInstant> earliestIncompleteInstant = 
candidateTimeline.filterInflightsAndRequested().firstInstant();
+    Option<HoodieInstant> earliestIncompleteInstant = ignoreIncompleteInstants 
? Option.empty()
+        : candidateTimeline.filterInflightsAndRequested().firstInstant();
 
     if (earliestIncompleteInstant.isPresent()) {
       return candidateTimeline.filterCompletedInstants()
@@ -289,20 +304,6 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     }
   }
 
-  /**
-   * Return the timestamp of the latest compaction instant.
-   */
-  @Override
-  public Option<String> getSyncedInstantTime() {
-    if (!enabled) {
-      return Option.empty();
-    }
-
-    HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
-    return timeline.getDeltaCommitTimeline().filterCompletedInstants()
-        .lastInstant().map(HoodieInstant::getTimestamp);
-  }
-
   public boolean enabled() {
     return enabled;
   }

Reply via email to