This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new be3f8fcd000 [HUDI-7701] Metadata table initailization with pending 
instants (#11137)
be3f8fcd000 is described below

commit be3f8fcd000386c0acc1da4aba6a8a51ee6b2b5d
Author: Danny Chan <[email protected]>
AuthorDate: Mon May 6 16:44:21 2024 +0800

    [HUDI-7701] Metadata table initailization with pending instants (#11137)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  | 49 +++++++++-------------
 .../common/testutils/HoodieMetadataTestTable.java  |  8 ----
 .../functional/TestHoodieBackedMetadata.java       | 34 ++++++++++++++-
 .../functional/TestHoodieMetadataBootstrap.java    | 20 ++++-----
 .../apache/hudi/io/TestHoodieTimelineArchiver.java | 14 ++++---
 .../hudi/table/TestHoodieMergeOnReadTable.java     | 27 ++++++------
 .../table/timeline/HoodieInstantTimeGenerator.java | 13 ++++--
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 19 ++-------
 .../hudi/common/testutils/HoodieTestTable.java     |  8 ++++
 .../hudi/metadata/TestHoodieTableMetadataUtil.java |  3 +-
 10 files changed, 105 insertions(+), 90 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index a8aa7f3725d..f01b5bd3a08 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -82,7 +82,6 @@ import org.slf4j.LoggerFactory;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -326,9 +325,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    */
   private boolean initializeFromFilesystem(String initializationTime, 
List<MetadataPartitionType> partitionsToInit,
                                            Option<String> 
inflightInstantTimestamp) throws IOException {
-    if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
-      return false;
-    }
+    Set<String> pendingDataInstants = getPendingDataInstants(dataMetaClient);
 
     // FILES partition is always required and is initialized first
     boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(FILES);
@@ -354,11 +351,11 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     // Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
     List<DirectoryInfo> partitionInfoList;
     if (filesPartitionAvailable) {
-      partitionInfoList = listAllPartitionsFromMDT(initializationTime);
+      partitionInfoList = listAllPartitionsFromMDT(initializationTime, 
pendingDataInstants);
     } else {
       // if auto initialization is enabled, then we need to list all 
partitions from the file system
       if (dataWriteConfig.getMetadataConfig().shouldAutoInitialize()) {
-        partitionInfoList = 
listAllPartitionsFromFilesystem(initializationTime);
+        partitionInfoList = 
listAllPartitionsFromFilesystem(initializationTime, pendingDataInstants);
       } else {
         // if auto initialization is disabled, we can return an empty list
         partitionInfoList = Collections.emptyList();
@@ -458,16 +455,16 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    * @return a unique timestamp for MDT
    */
   private String generateUniqueCommitInstantTime(String initializationTime) {
-    // if it's initialized via Async indexer, we don't need to alter the init 
time
+    // If it's initialized via Async indexer, we don't need to alter the init 
time.
+    // otherwise yields the timestamp on the fly.
+    // This function would be called multiple times in a single application if 
multiple indexes are being
+    // initialized one after the other.
     HoodieTimeline dataIndexTimeline = 
dataMetaClient.getActiveTimeline().filter(instant -> 
instant.getAction().equals(HoodieTimeline.INDEXING_ACTION));
     if (HoodieTableMetadataUtil.isIndexingCommit(dataIndexTimeline, 
initializationTime)) {
       return initializationTime;
     }
-    // Add suffix to initializationTime to find an unused instant time for the 
next index initialization.
-    // This function would be called multiple times in a single application if 
multiple indexes are being
-    // initialized one after the other.
     for (int offset = 0; ; ++offset) {
-      final String commitInstantTime = 
HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset);
+      final String commitInstantTime = 
HoodieInstantTimeGenerator.instantTimePlusMillis(SOLO_COMMIT_TIMESTAMP, offset);
       if 
(!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
         return commitInstantTime;
       }
@@ -602,22 +599,14 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     return Pair.of(fileGroupCount, allPartitionsRecord.union(fileListRecords));
   }
 
-  private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, 
Option<String> inflightInstantTimestamp) {
-    // We can only initialize if there are no pending operations on the dataset
-    List<HoodieInstant> pendingDataInstant = dataMetaClient.getActiveTimeline()
+  private Set<String> getPendingDataInstants(HoodieTableMetaClient 
dataMetaClient) {
+    // Initialize excluding the pending operations on the dataset
+    return dataMetaClient.getActiveTimeline()
         .getInstantsAsStream().filter(i -> !i.isCompleted())
-        .filter(i -> !inflightInstantTimestamp.isPresent() || 
!i.getTimestamp().equals(inflightInstantTimestamp.get()))
         // regular writers should not be blocked due to pending indexing action
         .filter(i -> !HoodieTimeline.INDEXING_ACTION.equals(i.getAction()))
-        .collect(Collectors.toList());
-
-    if (!pendingDataInstant.isEmpty()) {
-      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
-      LOG.warn("Cannot initialize metadata table as operation(s) are in 
progress on the dataset: {}",
-          Arrays.toString(pendingDataInstant.toArray()));
-      return true;
-    }
-    return false;
+        .map(HoodieInstant::getTimestamp)
+        .collect(Collectors.toSet());
   }
 
   private HoodieTableMetaClient initializeMetaClient() throws IOException {
@@ -643,9 +632,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    * Function to find hoodie partitions and list files in them in parallel.
    *
    * @param initializationTime Files which have a timestamp after this are 
neglected
+   * @param pendingDataInstants Pending instants on data set
    * @return List consisting of {@code DirectoryInfo} for each partition found.
    */
-  private List<DirectoryInfo> listAllPartitionsFromFilesystem(String 
initializationTime) {
+  private List<DirectoryInfo> listAllPartitionsFromFilesystem(String 
initializationTime, Set<String> pendingDataInstants) {
     List<StoragePath> pathsToList = new LinkedList<>();
     pathsToList.add(new StoragePath(dataWriteConfig.getBasePath()));
 
@@ -664,7 +654,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       List<DirectoryInfo> processedDirectories = 
engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
         HoodieStorage storage = HoodieStorageUtils.getStorage(path, 
storageConf);
         String relativeDirPath = 
FSUtils.getRelativePartitionPath(storageBasePath, path);
-        return new DirectoryInfo(relativeDirPath, 
storage.listDirectEntries(path), initializationTime);
+        return new DirectoryInfo(relativeDirPath, 
storage.listDirectEntries(path), initializationTime, pendingDataInstants);
       }, numDirsToList);
 
       pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList, 
pathsToList.size()));
@@ -697,15 +687,16 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    * Function to find hoodie partitions and list files in them in parallel 
from MDT.
    *
    * @param initializationTime Files which have a timestamp after this are 
neglected
+   * @param pendingDataInstants Files coming from pending instants are 
neglected
    * @return List consisting of {@code DirectoryInfo} for each partition found.
    */
-  private List<DirectoryInfo> listAllPartitionsFromMDT(String 
initializationTime) throws IOException {
+  private List<DirectoryInfo> listAllPartitionsFromMDT(String 
initializationTime, Set<String> pendingDataInstants) throws IOException {
     List<DirectoryInfo> dirinfoList = new LinkedList<>();
     List<String> allPartitionPaths = metadata.getAllPartitionPaths().stream()
         .map(partitionPath -> dataWriteConfig.getBasePath() + 
StoragePath.SEPARATOR_CHAR + partitionPath).collect(Collectors.toList());
     Map<String, List<StoragePathInfo>> partitionFileMap = 
metadata.getAllFilesInPartitions(allPartitionPaths);
     for (Map.Entry<String, List<StoragePathInfo>> entry : 
partitionFileMap.entrySet()) {
-      dirinfoList.add(new DirectoryInfo(entry.getKey(), entry.getValue(), 
initializationTime));
+      dirinfoList.add(new DirectoryInfo(entry.getKey(), entry.getValue(), 
initializationTime, pendingDataInstants));
     }
     return dirinfoList;
   }
@@ -1042,7 +1033,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
 
     // Restore requires the existing pipelines to be shutdown. So we can 
safely scan the dataset to find the current
     // list of files in the filesystem.
-    List<DirectoryInfo> dirInfoList = 
listAllPartitionsFromFilesystem(instantTime);
+    List<DirectoryInfo> dirInfoList = 
listAllPartitionsFromFilesystem(instantTime, Collections.emptySet());
     Map<String, DirectoryInfo> dirInfoMap = 
dirInfoList.stream().collect(Collectors.toMap(DirectoryInfo::getRelativePath, 
Function.identity()));
     dirInfoList.clear();
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
index ae6de2f680d..c2ae9024f2a 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
@@ -114,14 +114,6 @@ public class HoodieMetadataTestTable extends 
HoodieTestTable {
     return this;
   }
 
-  public HoodieTestTable moveInflightCommitToComplete(String instantTime, 
HoodieCommitMetadata metadata, boolean ignoreWriter) throws IOException {
-    super.moveInflightCommitToComplete(instantTime, metadata);
-    if (!ignoreWriter && writer != null) {
-      writer.updateFromWriteStatuses(metadata, 
context.get().emptyHoodieData(), instantTime);
-    }
-    return this;
-  }
-
   @Override
   public HoodieTestTable moveInflightCompactionToComplete(String instantTime, 
HoodieCommitMetadata metadata) throws IOException {
     super.moveInflightCompactionToComplete(instantTime, metadata);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index fc55c6723ee..52938c98547 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -925,6 +925,38 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     
assertEquals(HoodieInstantTimeGenerator.instantTimeMinusMillis(inflightInstant2,
 1L), tableMetadata.getLatestCompactionTime().get());
   }
 
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testInitializeMetadataTableWithPendingInstant(HoodieTableType 
tableType) throws Exception {
+    init(tableType, false);
+    initWriteConfigAndMetatableWriter(writeConfig, false);
+    // 1. firstly we disable the metadata table, then create two completed 
commits and one inflight commit.
+    // 1.1write 2 commits first.
+    doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
+    doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
+
+    // 1.2 create another inflight commit
+    String inflightInstant = metaClient.createNewInstantTime();
+    HoodieCommitMetadata inflightCommitMeta = 
testTable.doWriteOperation(inflightInstant, UPSERT, emptyList(),
+        asList("p1", "p2"), 2, false, true);
+    doWriteOperation(testTable, metaClient.createNewInstantTime());
+
+    // 2. now enable the metadata table and triggers the initialization
+    writeConfig = getWriteConfigBuilder(true, true, false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .enableMetrics(false)
+            .withMaxNumDeltaCommitsBeforeCompaction(4)
+            .build()).build();
+
+    // 2.1 initializes the metadata table, it will exclude the files from the 
inflight instant.
+    initWriteConfigAndMetatableWriter(writeConfig, true);
+
+    // 2.2 move inflight to completed
+    testTable.moveInflightCommitToComplete(inflightInstant, 
inflightCommitMeta);
+    validateMetadata(testTable, true);
+  }
+
   /**
    * Tests that virtual key configs are honored in base files after compaction 
in metadata table.
    */
@@ -1131,7 +1163,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     assertEquals(1, 
timeline.getCommitsTimeline().filterCompletedInstants().countInstants());
     assertEquals(3, mdtTimeline.countInstants());
     assertEquals(3, 
mdtTimeline.getCommitsTimeline().filterCompletedInstants().countInstants());
-    String mdtInitCommit2 = 
HoodieTableMetadataUtil.createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP, 1);
+    String mdtInitCommit2 = 
mdtTimeline.getCommitsTimeline().filterCompletedInstants().getInstants().get(1).getTimestamp();
     Pair<HoodieInstant, HoodieCommitMetadata> lastCommitMetadataWithValidData =
         mdtTimeline.getLastCommitMetadataWithValidData().get();
     String commit = lastCommitMetadataWithValidData.getLeft().getTimestamp();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java
index 2641feab321..eb64be99769 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.testutils.FileCreateUtils;
-import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.config.HoodieArchivalConfig;
@@ -167,20 +166,19 @@ public class TestHoodieMetadataBootstrap extends 
TestHoodieMetadataBase {
     HoodieTableType tableType = COPY_ON_WRITE;
     init(tableType, false);
 
+    // In real production env, bootstrap action can only happen on empty table,
+    // otherwise we need to roll back the previous bootstrap first,
+    // see 'SparkBootstrapCommitActionExecutor.execute' for more details.
     doPreBootstrapWriteOperation(testTable, INSERT, "0000001");
     doPreBootstrapWriteOperation(testTable, "0000002");
     // add an inflight commit
     HoodieCommitMetadata inflightCommitMeta = 
testTable.doWriteOperation("00000007", UPSERT, emptyList(),
-        asList("p1", "p2"), 2, true, true);
+        asList("p1", "p2"), 2, false, true);
     // bootstrap and following validation should fail. bootstrap should not 
happen.
     bootstrapAndVerifyFailure();
 
     // once the commit is complete, metadata should get fully synced.
-    // in prod code path, SparkHoodieBackedTableMetadataWriter.create() will 
be called for every commit,
-    // which may not be the case here if we directly call 
HoodieBackedTableMetadataWriter.update()
-    // hence let's first move the commit to complete and invoke sync directly
-    ((HoodieMetadataTestTable) 
testTable).moveInflightCommitToComplete("00000007", inflightCommitMeta, true);
-    syncTableMetadata(writeConfig);
+    testTable.moveInflightCommitToComplete("00000007", inflightCommitMeta);
     validateMetadata(testTable);
   }
 
@@ -261,12 +259,8 @@ public class TestHoodieMetadataBootstrap extends 
TestHoodieMetadataBase {
     writeConfig = getWriteConfig(true, true);
     initWriteConfigAndMetatableWriter(writeConfig, true);
     syncTableMetadata(writeConfig);
-    try {
-      validateMetadata(testTable);
-      Assertions.fail("Should have failed");
-    } catch (IllegalStateException e) {
-      // expected
-    }
+    Assertions.assertThrows(Error.class, () -> validateMetadata(testTable),
+        "expected 6 lines, but only got 4");
   }
 
   private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws 
Exception {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 53241a34983..49351b463c2 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -38,6 +38,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.LSMTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -1519,34 +1520,35 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
       List<HoodieInstant> metadataTableInstants = 
metadataTableMetaClient.getActiveTimeline()
           .getCommitsTimeline().filterCompletedInstants().getInstants();
 
+      final String mdtInitCommit = 
HoodieInstantTimeGenerator.instantTimePlusMillis(SOLO_COMMIT_TIMESTAMP, 0L);
       if (i == 1) {
-        // In the metadata table timeline, the first delta commit is 
"00000000000000"
+        // In the metadata table timeline, the first delta commit is 
"00000000000000000"
         assertEquals(i + 1, metadataTableInstants.size());
         assertTrue(metadataTableInstants.contains(
-            new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, SOLO_COMMIT_TIMESTAMP + "010")));
+            new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, mdtInitCommit)));
         assertTrue(metadataTableInstants.contains(
             new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, instants.get(0))));
       } else if (i == 2) {
         assertEquals(i - 1, metadataTableInstants.size());
         assertTrue(metadataTableInstants.contains(
-            new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, SOLO_COMMIT_TIMESTAMP + "010")));
+            new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, mdtInitCommit)));
         assertFalse(metadataTableInstants.contains(
             new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, instants.get(1))));
       } else if (i <= 9) {
-        // In the metadata table timeline, the first delta commit is 
"00000000000000"
+        // In the metadata table timeline, the first delta commit is 
"00000000000000000"
         // from metadata table init, delta commits 1 till 8 are added
         // later on without archival or compaction
         // rollback in DT will also trigger rollback in MDT
         assertEquals(i - 1, metadataTableInstants.size());
         assertTrue(metadataTableInstants.contains(
-            new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, SOLO_COMMIT_TIMESTAMP + "010")));
+            new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, mdtInitCommit)));
         // rolled back commits may not be present in MDT timeline [1]
         IntStream.range(3, i).forEach(j ->
             assertTrue(metadataTableInstants.contains(
                 new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, instants.get(j - 1)))));
       } else if (i == 10) {
         // i == 10
-        // The instant "00000000000010" was archived since it's less than
+        // The instant "00000000000000000" was archived since it's less than
         // the earliest commit on the dataset active timeline,
         // the dataset active timeline has instants:
         //   [7.commit, 8.commit, 9.commit, 10.commit]
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index e9bade9f842..070d4d0d325 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -72,6 +72,7 @@ import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -434,20 +435,22 @@ public class TestHoodieMergeOnReadTable extends 
SparkClientFunctionalTestHarness
       // Create a commit without metadata stats in metadata to test backwards 
compatibility
       HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
       String commitActionType = table.getMetaClient().getCommitActionType();
-      HoodieInstant instant = new HoodieInstant(State.REQUESTED, 
commitActionType, "000");
+      List<String> instants = new ArrayList<>();
+      String instant0 = metaClient.createNewInstantTime();
+      HoodieInstant instant = new HoodieInstant(State.REQUESTED, 
commitActionType, instant0);
       activeTimeline.createNewInstant(instant);
       activeTimeline.transitionRequestedToInflight(instant, Option.empty());
-      instant = new HoodieInstant(State.INFLIGHT, commitActionType, "000");
+      instant = new HoodieInstant(State.INFLIGHT, commitActionType, instant0);
       activeTimeline.saveAsComplete(instant, Option.empty());
 
-      String instantTime = "001";
-      client.startCommitWithTime(instantTime);
+      String instant1 = metaClient.createNewInstantTime();
+      client.startCommitWithTime(instant1);
 
-      List<HoodieRecord> records = dataGen.generateInserts(instantTime, 200);
+      List<HoodieRecord> records = dataGen.generateInserts(instant1, 200);
       JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
 
-      JavaRDD<WriteStatus> statuses = client.insert(writeRecords, instantTime);
-      assertTrue(client.commit(instantTime, statuses), "Commit should 
succeed");
+      JavaRDD<WriteStatus> statuses = client.insert(writeRecords, instant1);
+      assertTrue(client.commit(instant1, statuses), "Commit should succeed");
 
       // Read from commit file
       table = HoodieSparkTable.create(cfg, context());
@@ -462,11 +465,11 @@ public class TestHoodieMergeOnReadTable extends 
SparkClientFunctionalTestHarness
       }
       assertEquals(200, inserts);
 
-      instantTime = "002";
-      client.startCommitWithTime(instantTime);
-      records = dataGen.generateUpdates(instantTime, records);
+      String instant2 = metaClient.createNewInstantTime();
+      client.startCommitWithTime(instant2);
+      records = dataGen.generateUpdates(instant2, records);
       writeRecords = jsc().parallelize(records, 1);
-      statuses = client.upsert(writeRecords, instantTime);
+      statuses = client.upsert(writeRecords, instant2);
       //assertTrue(client.commit(instantTime, statuses), "Commit should 
succeed");
       inserts = 0;
       int upserts = 0;
@@ -480,7 +483,7 @@ public class TestHoodieMergeOnReadTable extends 
SparkClientFunctionalTestHarness
       assertEquals(0, inserts);
       assertEquals(200, upserts);
 
-      client.rollback(instantTime);
+      client.rollback(instant2);
 
       // Read from commit file
       table = HoodieSparkTable.create(cfg, context());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
index 80354195b72..174d79acb78 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
@@ -95,13 +95,18 @@ public class HoodieInstantTimeGenerator {
   }
 
   public static String instantTimePlusMillis(String timestamp, long 
milliseconds) {
+    final String timestampInMillis = fixInstantTimeCompatibility(timestamp);
     try {
-      String timestampInMillis = fixInstantTimeCompatibility(timestamp);
       LocalDateTime dt = LocalDateTime.parse(timestampInMillis, 
MILLIS_INSTANT_TIME_FORMATTER);
       ZoneId zoneId = HoodieTimelineTimeZone.UTC.equals(commitTimeZone) ? 
ZoneId.of("UTC") : ZoneId.systemDefault();
       return 
MILLIS_INSTANT_TIME_FORMATTER.format(dt.atZone(zoneId).toInstant().plusMillis(milliseconds).atZone(zoneId).toLocalDateTime());
     } catch (DateTimeParseException e) {
-      throw new HoodieException(e);
+      // To work with tests, that generate arbitrary timestamps, we need to 
pad the timestamp with 0s.
+      if (isValidInstantTime(timestamp)) {
+        return String.format("%0" + MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH + 
"d", Long.parseLong(timestamp) + milliseconds);
+      } else {
+        throw new HoodieException(e);
+      }
     }
   }
 
@@ -113,8 +118,8 @@ public class HoodieInstantTimeGenerator {
       return 
MILLIS_INSTANT_TIME_FORMATTER.format(dt.atZone(zoneId).toInstant().minusMillis(milliseconds).atZone(zoneId).toLocalDateTime());
     } catch (DateTimeParseException e) {
       // To work with tests, that generate arbitrary timestamps, we need to 
pad the timestamp with 0s.
-      if (isValidInstantTime(timestampInMillis)) {
-        return String.format("%0" + timestampInMillis.length() + "d", 
Long.parseLong(timestampInMillis) - milliseconds);
+      if (isValidInstantTime(timestamp)) {
+        return String.format("%0" + MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH + 
"d", Long.parseLong(timestamp) - milliseconds);
       } else {
         throw new HoodieException(e);
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 09e7be7e993..9e6e5b42975 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -166,9 +166,6 @@ public class HoodieTableMetadataUtil {
       DoubleWrapper.class, FloatWrapper.class, LongWrapper.class,
       StringWrapper.class, TimeMicrosWrapper.class, 
TimestampMicrosWrapper.class));
 
-  // This suffix and all after that are used for initialization of the various 
partitions. The unused suffixes lower than this value
-  // are reserved for future operations on the MDT.
-  private static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; // 
corresponds to "010";
   // we have max of 4 partitions (FILES, COL_STATS, BLOOM, RLI)
   private static final List<String> 
VALID_PARTITION_INITIALIZATION_TIME_SUFFIXES = Arrays.asList("010", "011", 
"012", "013");
 
@@ -1625,16 +1622,6 @@ public class HoodieTableMetadataUtil {
     return fileId.endsWith("-0") ? fileId.length() - 2 : fileId.length();
   }
 
-  /**
-   * Create the timestamp for an index initialization operation on the 
metadata table.
-   * <p>
-   * Since many MDT partitions can be initialized one after other the offset 
parameter controls generating a
-   * unique timestamp.
-   */
-  public static String createIndexInitTimestamp(String timestamp, int offset) {
-    return String.format("%s%03d", timestamp, 
PARTITION_INITIALIZATION_TIME_SUFFIX + offset);
-  }
-
   /**
    * Estimates the file group count to use for a MDT partition.
    *
@@ -2008,7 +1995,7 @@ public class HoodieTableMetadataUtil {
     // Is this a hoodie partition
     private boolean isHoodiePartition = false;
 
-    public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos, 
String maxInstantTime) {
+    public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos, 
String maxInstantTime, Set<String> pendingDataInstants) {
       this.relativePath = relativePath;
 
       // Pre-allocate with the maximum length possible
@@ -2026,8 +2013,8 @@ public class HoodieTableMetadataUtil {
         } else if (FSUtils.isDataFile(pathInfo.getPath())) {
           // Regular HUDI data file (base file or log file)
           String dataFileCommitTime = 
FSUtils.getCommitTime(pathInfo.getPath().getName());
-          // Limit the file listings to files which were created before the 
maxInstant time.
-          if (HoodieTimeline.compareTimestamps(dataFileCommitTime, 
LESSER_THAN_OR_EQUALS, maxInstantTime)) {
+          // Limit the file listings to files which were created by successful 
commits before the maxInstant time.
+          if (!pendingDataInstants.contains(dataFileCommitTime) && 
HoodieTimeline.compareTimestamps(dataFileCommitTime, LESSER_THAN_OR_EQUALS, 
maxInstantTime)) {
             filenameToSizeMap.put(pathInfo.getPath().getName(), 
pathInfo.getLength());
           }
         }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index c3aa1eb2d98..2720aa42dd0 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -621,6 +621,13 @@ public class HoodieTestTable {
     return this;
   }
 
+  public HoodieTestTable withPartitionMetaFiles(List<String> partitionPaths) 
throws IOException {
+    for (String partitionPath : partitionPaths) {
+      FileCreateUtils.createPartitionMetaFile(basePath, partitionPath);
+    }
+    return this;
+  }
+
   public HoodieTestTable withMarkerFile(String partitionPath, String fileId, 
IOType ioType) throws IOException {
     createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, 
ioType);
     return this;
@@ -1127,6 +1134,7 @@ public class HoodieTestTable {
     }
     for (Map.Entry<String, List<Pair<String, Integer>>> entry : 
partitionToFilesNameLengthMap.entrySet()) {
       String partition = entry.getKey();
+      this.withPartitionMetaFiles(partition); // needed by the metadata table 
initialization.
       this.withBaseFilesInPartition(partition, 
testTableState.getPartitionToBaseFileInfoMap(commitTime).get(partition));
       if (MERGE_ON_READ.equals(metaClient.getTableType()) && 
UPSERT.equals(operationType)) {
         this.withLogFilesInPartition(partition, 
testTableState.getPartitionToLogFileInfoMap(commitTime).get(partition));
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 88b051facaf..b69dc94609b 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -123,7 +123,8 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
         partitionInfoList.add(new HoodieTableMetadataUtil.DirectoryInfo(
             p,
             
metaClient.getStorage().listDirectEntries(Arrays.asList(storagePath1, 
storagePath2)),
-            instant2));
+            instant2,
+            Collections.emptySet()));
       } catch (Exception e) {
         throw new RuntimeException(e);
       }

Reply via email to