This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6ade85078b50e3c9ffea0b6a79ebe8cd35747daf
Author: Prashant Wason <[email protected]>
AuthorDate: Tue Oct 20 11:22:13 2020 -0700

    [HUDI-1317] Fix initialization when Async jobs are scheduled.
    
    Logic is explained in the JIRA ticket.
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  2 -
 .../apache/hudi/metadata/HoodieMetadataWriter.java | 50 ++++++++++++++--------
 .../apache/hudi/metadata/TestHoodieMetadata.java   | 50 +++++++++++++++-------
 3 files changed, 68 insertions(+), 34 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java 
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d8d732f..b499ac9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -42,8 +42,6 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.StringReader;
-import java.io.StringWriter;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
index 936d294..c7eb33f 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
@@ -58,9 +58,9 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineLayout;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
@@ -305,7 +305,16 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
     ValidationUtils.checkState(enabled, "Metadata table cannot be initialized 
as it is not enabled");
 
     // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP 
as the instant time for initial commit
-    Option<HoodieInstant> latestInstant = 
datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+    // Otherwise, we use the timestamp of the instant which does not have any 
non-completed instants before it.
+    Option<HoodieInstant> latestInstant = Option.empty();
+    boolean foundNonComplete = false;
+    for (HoodieInstant instant : 
datasetMetaClient.getActiveTimeline().getInstants().collect(Collectors.toList()))
 {
+      if (!instant.isCompleted()) {
+        foundNonComplete = true;
+      } else if (!foundNonComplete) {
+        latestInstant = Option.of(instant);
+      }
+    }
     String createInstantTime = latestInstant.isPresent() ? 
latestInstant.get().getTimestamp() : SOLO_COMMIT_TIMESTAMP;
 
     LOG.info("Creating a new metadata table in " + metadataBasePath + " at 
instant " + createInstantTime);
@@ -320,7 +329,6 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
 
     // List all partitions in parallel and collect the files in them
     final String dbasePath = datasetBasePath;
-    final SerializableConfiguration serializedConf = new 
SerializableConfiguration(hadoopConf);
     int parallelism =  Math.min(partitions.size(), jsc.defaultParallelism()) + 
1; // +1 to prevent 0 parallelism
     JavaPairRDD<String, FileStatus[]> partitionFileListRDD = 
jsc.parallelize(partitions, parallelism)
         .mapToPair(partition -> {
@@ -335,6 +343,7 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
     // Create a HoodieCommitMetadata with writeStats for all discovered files
     int[] stats = {0};
     HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+
     partitionFileList.forEach(t -> {
       final String partition = t._1;
       try {
@@ -345,25 +354,32 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
         throw new HoodieMetadataException("Failed to check partition " + 
partition, e);
       }
 
+      // Filter the statuses to only include files which were created before 
or on createInstantTime
+      Arrays.stream(t._2).filter(status -> {
+        String filename = status.getPath().getName();
+        if 
(filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
+          return false;
+        }
+        if (HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), 
HoodieTimeline.GREATER_THAN,
+            createInstantTime)) {
+          return false;
+        }
+        return true;
+      }).forEach(status -> {
+        HoodieWriteStat writeStat = new HoodieWriteStat();
+        writeStat.setPath(partition + Path.SEPARATOR + 
status.getPath().getName());
+        writeStat.setPartitionPath(partition);
+        writeStat.setTotalWriteBytes(status.getLen());
+        metadata.addWriteStat(partition, writeStat);
+        stats[0] += 1;
+      });
+
       // If the partition has no files then create a writeStat with no file 
path
-      if (t._2.length == 0) {
+      if (metadata.getWriteStats(partition) == null) {
         HoodieWriteStat writeStat = new HoodieWriteStat();
         writeStat.setPartitionPath(partition);
         metadata.addWriteStat(partition, writeStat);
-      } else {
-        Arrays.stream(t._2).forEach(status -> {
-          String filename = status.getPath().getName();
-          if 
(filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
-            return;
-          }
-          HoodieWriteStat writeStat = new HoodieWriteStat();
-          writeStat.setPath(partition + Path.SEPARATOR + filename);
-          writeStat.setPartitionPath(partition);
-          writeStat.setTotalWriteBytes(status.getLen());
-          metadata.addWriteStat(partition, writeStat);
-        });
       }
-      stats[0] += t._2.length;
     });
 
     LOG.info("Committing " + partitionFileList.size() + " partitions and " + 
stats[0] + " files to metadata");
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java 
b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
index 0c5839b..36a8c13 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
+++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
@@ -25,6 +25,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -38,7 +40,6 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.ClientUtils;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.metrics.Registry;
-import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieKey;
@@ -52,6 +53,7 @@ import 
org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -82,16 +84,23 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
   private String metadataTableBasePath;
 
   public void init() throws IOException {
-    init(HoodieTableType.MERGE_ON_READ);
+    init(HoodieTableType.MERGE_ON_READ, true);
   }
 
   public void init(HoodieTableType tableType) throws IOException {
-    initDFS();
-    initSparkContexts("TestHoodieMetadata");
-    hadoopConf.addResource(dfs.getConf());
+    init(tableType, true);
+  }
+
+  public void init(HoodieTableType tableType, boolean useDFS) throws 
IOException {
     initPath();
-    dfs.mkdirs(new Path(basePath));
-    metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType);
+    initSparkContexts("TestHoodieMetadata");
+    initFileSystem();
+    if (useDFS) {
+      initDFS();
+      dfs.mkdirs(new Path(basePath));
+    }
+    initMetaClient();
+
     initTestDataGenerator();
 
     metadataTableBasePath = 
HoodieMetadataReader.getMetadataTableBasePath(basePath);
@@ -133,22 +142,33 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
    */
   @Test
   public void testOnlyValidPartitionsAdded() throws Exception {
-    init();
-
-    HoodieTestDataGenerator.writePartitionMetadata(dfs, 
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
+    // This test requires local file system
+    init(HoodieTableType.MERGE_ON_READ, false);
 
     // Create an empty directory which is not a partition directory (lacks 
partition metadata)
     final String nonPartitionDirectory = 
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-nonpartition";
-    final Path nonPartitionPath = new Path(basePath, nonPartitionDirectory);
-    dfs.mkdirs(nonPartitionPath);
+    Files.createDirectories(Paths.get(basePath, nonPartitionDirectory));
+
+    // Create some commits
+    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+    testTable.withPartitionMetaFiles("p1", "p2")
+             .addCommit("001").withBaseFilesInPartition("p1", 
10).withBaseFilesInPartition("p2", 10, 10)
+             .addCommit("002").withBaseFilesInPartition("p1", 
10).withBaseFilesInPartition("p2", 10, 10, 10)
+             .addInflightCommit("003").withBaseFilesInPartition("p1", 
10).withBaseFilesInPartition("p2", 10);
 
     try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, 
getWriteConfig(true, true))) {
-      client.startCommitWithTime("001");
-      validateMetadata(client);
+      client.startCommitWithTime("005");
 
-      List<String> partitions = metadata(client).getAllPartitionPaths(dfs, 
basePath, true);
+      List<String> partitions = metadata(client).getAllPartitionPaths(dfs, 
basePath, false);
       assertFalse(partitions.contains(nonPartitionDirectory),
           "Must not contain the non-partition " + nonPartitionDirectory);
+      assertTrue(partitions.contains("p1"), "Must contain partition p1");
+      assertTrue(partitions.contains("p2"), "Must contain partition p2");
+
+      FileStatus[] statuses = 
metadata(client).getAllFilesInPartition(hadoopConf, basePath, new 
Path(basePath, "p1"));
+      assertTrue(statuses.length == 2);
+      statuses = metadata(client).getAllFilesInPartition(hadoopConf, basePath, 
new Path(basePath, "p2"));
+      assertTrue(statuses.length == 5);
     }
   }
 

Reply via email to