This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch rfc-15 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 6ade85078b50e3c9ffea0b6a79ebe8cd35747daf Author: Prashant Wason <[email protected]> AuthorDate: Tue Oct 20 11:22:13 2020 -0700 [HUDI-1317] Fix initialization when Async jobs are scheduled. Logic is explained in the JIRA ticket. --- .../org/apache/hudi/config/HoodieWriteConfig.java | 2 - .../apache/hudi/metadata/HoodieMetadataWriter.java | 50 ++++++++++++++-------- .../apache/hudi/metadata/TestHoodieMetadata.java | 50 +++++++++++++++------- 3 files changed, 68 insertions(+), 34 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index d8d732f..b499ac9 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -42,8 +42,6 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; import java.io.InputStream; -import java.io.StringReader; -import java.io.StringWriter; import java.util.Arrays; import java.util.List; import java.util.Map; diff --git a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java index 936d294..c7eb33f 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java +++ b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java @@ -58,9 +58,9 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.TimelineLayout; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; @@ -305,7 +305,16 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled"); // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit - Option<HoodieInstant> latestInstant = datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant(); + // Otherwise, we use the timestamp of the instant which does not have any non-completed instants before it. + Option<HoodieInstant> latestInstant = Option.empty(); + boolean foundNonComplete = false; + for (HoodieInstant instant : datasetMetaClient.getActiveTimeline().getInstants().collect(Collectors.toList())) { + if (!instant.isCompleted()) { + foundNonComplete = true; + } else if (!foundNonComplete) { + latestInstant = Option.of(instant); + } + } String createInstantTime = latestInstant.isPresent() ? latestInstant.get().getTimestamp() : SOLO_COMMIT_TIMESTAMP; LOG.info("Creating a new metadata table in " + metadataBasePath + " at instant " + createInstantTime); @@ -320,7 +329,6 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial // List all partitions in parallel and collect the files in them final String dbasePath = datasetBasePath; - final SerializableConfiguration serializedConf = new SerializableConfiguration(hadoopConf); int parallelism = Math.min(partitions.size(), jsc.defaultParallelism()) + 1; // +1 to prevent 0 parallelism JavaPairRDD<String, FileStatus[]> partitionFileListRDD = jsc.parallelize(partitions, parallelism) .mapToPair(partition -> { @@ -335,6 +343,7 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial // Create a HoodieCommitMetadata with writeStats for all discovered files int[] stats = {0}; HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + partitionFileList.forEach(t -> { final String partition = t._1; try { @@ -345,25 +354,32 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial throw new HoodieMetadataException("Failed to check partition " + partition, e); } + // Filter the statuses to only include files which were created before or on createInstantTime + Arrays.stream(t._2).filter(status -> { + String filename = status.getPath().getName(); + if (filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { + return false; + } + if (HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN, + createInstantTime)) { + return false; + } + return true; + }).forEach(status -> { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setPath(partition + Path.SEPARATOR + status.getPath().getName()); + writeStat.setPartitionPath(partition); + writeStat.setTotalWriteBytes(status.getLen()); + metadata.addWriteStat(partition, writeStat); + stats[0] += 1; + }); + // If the partition has no files then create a writeStat with no file path - if (t._2.length == 0) { + if (metadata.getWriteStats(partition) == null) { HoodieWriteStat writeStat = new HoodieWriteStat(); writeStat.setPartitionPath(partition); metadata.addWriteStat(partition, writeStat); - } else { - Arrays.stream(t._2).forEach(status -> { - String filename = status.getPath().getName(); - if (filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { - return; - } - HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setPath(partition + Path.SEPARATOR + filename); - writeStat.setPartitionPath(partition); - writeStat.setTotalWriteBytes(status.getLen()); - metadata.addWriteStat(partition, writeStat); - }); } - stats[0] += t._2.length; }); LOG.info("Committing " + partitionFileList.size() + " partitions and " + stats[0] + " files to metadata"); diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java index 0c5839b..36a8c13 100644 --- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java +++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java @@ -25,6 +25,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -38,7 +40,6 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.utils.ClientUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.metrics.Registry; -import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieKey; @@ -52,6 +53,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; @@ -82,16 +84,23 @@ public class TestHoodieMetadata extends HoodieClientTestHarness { private String metadataTableBasePath; public void init() throws IOException { - init(HoodieTableType.MERGE_ON_READ); + init(HoodieTableType.MERGE_ON_READ, true); } public void init(HoodieTableType tableType) throws IOException { - initDFS(); - initSparkContexts("TestHoodieMetadata"); - hadoopConf.addResource(dfs.getConf()); + init(tableType, true); + } + + public void init(HoodieTableType tableType, boolean useDFS) throws IOException { initPath(); - dfs.mkdirs(new Path(basePath)); - metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType); + initSparkContexts("TestHoodieMetadata"); + initFileSystem(); + if (useDFS) { + initDFS(); + dfs.mkdirs(new Path(basePath)); + } + initMetaClient(); + initTestDataGenerator(); metadataTableBasePath = HoodieMetadataReader.getMetadataTableBasePath(basePath); @@ -133,22 +142,33 @@ public class TestHoodieMetadata extends HoodieClientTestHarness { */ @Test public void testOnlyValidPartitionsAdded() throws Exception { - init(); - - HoodieTestDataGenerator.writePartitionMetadata(dfs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath); + // This test requires local file system + init(HoodieTableType.MERGE_ON_READ, false); // Create an empty directory which is not a partition directory (lacks partition metadata) final String nonPartitionDirectory = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-nonpartition"; - final Path nonPartitionPath = new Path(basePath, nonPartitionDirectory); - dfs.mkdirs(nonPartitionPath); + Files.createDirectories(Paths.get(basePath, nonPartitionDirectory)); + + // Create some commits + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + testTable.withPartitionMetaFiles("p1", "p2") + .addCommit("001").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10) + .addCommit("002").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10, 10) + .addInflightCommit("003").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10); try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, getWriteConfig(true, true))) { - client.startCommitWithTime("001"); - validateMetadata(client); + client.startCommitWithTime("005"); - List<String> partitions = metadata(client).getAllPartitionPaths(dfs, basePath, true); + List<String> partitions = metadata(client).getAllPartitionPaths(dfs, basePath, false); assertFalse(partitions.contains(nonPartitionDirectory), "Must not contain the non-partition " + nonPartitionDirectory); + assertTrue(partitions.contains("p1"), "Must contain partition p1"); + assertTrue(partitions.contains("p2"), "Must contain partition p2"); + + FileStatus[] statuses = metadata(client).getAllFilesInPartition(hadoopConf, basePath, new Path(basePath, "p1")); + assertTrue(statuses.length == 2); + statuses = metadata(client).getAllFilesInPartition(hadoopConf, basePath, new Path(basePath, "p2")); + assertTrue(statuses.length == 5); } }
