This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4b94529aafd45b608c6d1ab13376216144b934f6 Author: Ryan Pifer <[email protected]> AuthorDate: Thu Dec 31 08:57:13 2020 -0800 [HUDI-1325] [RFC-15] Merge updates of unsynced instants to metadata table (apache#2342) [RFC-15] Fix partition key in metadata table when bootstrapping from file system (apache#2387) Co-authored-by: Ryan Pifer <[email protected]> --- .../metadata/HoodieBackedTableMetadataWriter.java | 256 ++--------------- .../java/org/apache/hudi/table/HoodieTable.java | 2 +- .../hudi/client/TestCompactionAdminClient.java | 6 + ...Metadata.java => TestHoodieBackedMetadata.java} | 152 ++++++++-- .../hudi/table/upgrade/TestUpgradeDowngrade.java | 6 + .../hudi/testutils/HoodieClientTestHarness.java | 7 +- ...edTableMetadata.java => BaseTableMetadata.java} | 272 ++++-------------- .../hudi/metadata/HoodieBackedTableMetadata.java | 227 +-------------- .../HoodieMetadataMergedInstantRecordScanner.java | 115 ++++++++ .../hudi/metadata/HoodieTableMetadataUtil.java | 311 +++++++++++++++++++++ .../apache/hudi/functional/TestCOWDataSource.scala | 4 +- 11 files changed, 677 insertions(+), 681 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index ed24980..823e70c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -39,9 +39,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; -import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -49,7 +47,6 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieMetricsConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; @@ -61,18 +58,14 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; -import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; /** @@ -211,7 +204,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta return metadataWriteConfig; } - public HoodieTableMetadata metadata() { + public HoodieBackedTableMetadata metadata() { return metadata; } @@ -340,7 +333,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta if (p.getRight().length > filesInDir.size()) { // Is a partition. Add all data files to result. - partitionToFileStatus.put(p.getLeft().getName(), filesInDir); + String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetMetaClient.getBasePath()), p.getLeft()); + partitionToFileStatus.put(partitionName, filesInDir); } else { // Add sub-dirs to the queue pathsToList.addAll(Arrays.stream(p.getRight()) @@ -374,35 +368,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta final HoodieActiveTimeline timeline = datasetMetaClient.getActiveTimeline(); for (HoodieInstant instant : instantsToSync) { LOG.info("Syncing instant " + instant + " to metadata table"); - ValidationUtils.checkArgument(instant.isCompleted(), "Only completed instants can be synced."); - - switch (instant.getAction()) { - case HoodieTimeline.CLEAN_ACTION: - HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(datasetMetaClient, instant); - update(cleanMetadata, instant.getTimestamp()); - break; - case HoodieTimeline.DELTA_COMMIT_ACTION: - case HoodieTimeline.COMMIT_ACTION: - case HoodieTimeline.COMPACTION_ACTION: - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); - update(commitMetadata, instant.getTimestamp()); - break; - case HoodieTimeline.ROLLBACK_ACTION: - HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata( - timeline.getInstantDetails(instant).get()); - update(rollbackMetadata, instant.getTimestamp()); - break; - case HoodieTimeline.RESTORE_ACTION: - HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata( - timeline.getInstantDetails(instant).get()); - update(restoreMetadata, instant.getTimestamp()); - break; - case HoodieTimeline.SAVEPOINT_ACTION: - // Nothing to be done here - break; - default: - throw new HoodieException("Unknown type of action " + instant.getAction()); + + Option<List<HoodieRecord>> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant, metadata.getSyncedInstantTime()); + if (records.isPresent()) { + commit(records.get(), MetadataPartitionType.FILES.partitionPath(), instant.getTimestamp()); } } // re-init the table metadata, for any future writes. @@ -420,44 +389,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta */ @Override public void update(HoodieCommitMetadata commitMetadata, String instantTime) { - if (!enabled) { - return; + if (enabled) { + List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime); + commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); } - - List<HoodieRecord> records = new LinkedList<>(); - List<String> allPartitions = new LinkedList<>(); - commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> { - final String partition = partitionStatName.equals("") ? NON_PARTITIONED_NAME : partitionStatName; - allPartitions.add(partition); - - Map<String, Long> newFiles = new HashMap<>(writeStats.size()); - writeStats.forEach(hoodieWriteStat -> { - String pathWithPartition = hoodieWriteStat.getPath(); - if (pathWithPartition == null) { - // Empty partition - LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat); - return; - } - - int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1; - String filename = pathWithPartition.substring(offset); - ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate files in HoodieCommitMetadata"); - newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes()); - }); - - // New files added to a partition - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord( - partition, Option.of(newFiles), Option.empty()); - records.add(record); - }); - - // New partitions created - HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new ArrayList<>(allPartitions)); - records.add(record); - - LOG.info("Updating at " + instantTime + " from Commit/" + commitMetadata.getOperationType() - + ". #partitions_updated=" + records.size()); - commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); } /** @@ -468,26 +403,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta */ @Override public void update(HoodieCleanerPlan cleanerPlan, String instantTime) { - if (!enabled) { - return; + if (enabled) { + List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(cleanerPlan, instantTime); + commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); } - - List<HoodieRecord> records = new LinkedList<>(); - int[] fileDeleteCount = {0}; - cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition, deletedPathInfo) -> { - fileDeleteCount[0] += deletedPathInfo.size(); - - // Files deleted from a partition - List<String> deletedFilenames = deletedPathInfo.stream().map(p -> new Path(p.getFilePath()).getName()) - .collect(Collectors.toList()); - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), - Option.of(deletedFilenames)); - records.add(record); - }); - - LOG.info("Updating at " + instantTime + " from CleanerPlan. #partitions_updated=" + records.size() - + ", #files_deleted=" + fileDeleteCount[0]); - commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); } /** @@ -498,26 +417,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta */ @Override public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { - if (!enabled) { - return; + if (enabled) { + List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime); + commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); } - - List<HoodieRecord> records = new LinkedList<>(); - int[] fileDeleteCount = {0}; - - cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { - // Files deleted from a partition - List<String> deletedFiles = partitionMetadata.getSuccessDeleteFiles(); - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), - Option.of(new ArrayList<>(deletedFiles))); - - records.add(record); - fileDeleteCount[0] += deletedFiles.size(); - }); - - LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size() - + ", #files_deleted=" + fileDeleteCount[0]); - commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); } /** @@ -528,16 +431,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta */ @Override public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { - if (!enabled) { - return; + if (enabled) { + List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(restoreMetadata, instantTime, metadata.getSyncedInstantTime()); + commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); } - - Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>(); - Map<String, List<String>> partitionToDeletedFiles = new HashMap<>(); - restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> { - rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles, partitionToAppendedFiles)); - }); - commitRollback(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore"); } /** @@ -548,119 +445,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta */ @Override public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) { - if (!enabled) { - return; + if (enabled) { + List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(rollbackMetadata, instantTime, metadata.getSyncedInstantTime()); + commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); } - - Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>(); - Map<String, List<String>> partitionToDeletedFiles = new HashMap<>(); - processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles); - commitRollback(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Rollback"); - } - - /** - * Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}. - * - * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This - * function will extract this change file for each partition. - * - * @param rollbackMetadata {@code HoodieRollbackMetadata} - * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition. - * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes. - */ - private void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata, - Map<String, List<String>> partitionToDeletedFiles, - Map<String, Map<String, Long>> partitionToAppendedFiles) { - rollbackMetadata.getPartitionMetadata().values().forEach(pm -> { - final String partition = pm.getPartitionPath(); - - if (!pm.getSuccessDeleteFiles().isEmpty()) { - if (!partitionToDeletedFiles.containsKey(partition)) { - partitionToDeletedFiles.put(partition, new ArrayList<>()); - } - - // Extract deleted file name from the absolute paths saved in getSuccessDeleteFiles() - List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p -> new Path(p).getName()) - .collect(Collectors.toList()); - partitionToDeletedFiles.get(partition).addAll(deletedFiles); - } - - if (!pm.getAppendFiles().isEmpty()) { - if (!partitionToAppendedFiles.containsKey(partition)) { - partitionToAppendedFiles.put(partition, new HashMap<>()); - } - - // Extract appended file name from the absolute paths saved in getAppendFiles() - pm.getAppendFiles().forEach((path, size) -> { - partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, (oldSize, newSizeCopy) -> { - return size + oldSize; - }); - }); - } - }); - } - - /** - * Create file delete records and commit. - * - * @param partitionToDeletedFiles {@code Map} of partitions and the deleted files - * @param instantTime Timestamp at which the deletes took place - * @param operation Type of the operation which caused the files to be deleted - */ - private void commitRollback(Map<String, List<String>> partitionToDeletedFiles, - Map<String, Map<String, Long>> partitionToAppendedFiles, String instantTime, - String operation) { - List<HoodieRecord> records = new LinkedList<>(); - int[] fileChangeCount = {0, 0}; // deletes, appends - - partitionToDeletedFiles.forEach((partition, deletedFiles) -> { - // Rollbacks deletes instants from timeline. The instant being rolled-back may not have been synced to the - // metadata table. Hence, the deleted filed need to be checked against the metadata. - try { - FileStatus[] existingStatuses = metadata.fetchAllFilesInPartition(new Path(metadata.getDatasetBasePath(), partition)); - Set<String> currentFiles = - Arrays.stream(existingStatuses).map(s -> s.getPath().getName()).collect(Collectors.toSet()); - - int origCount = deletedFiles.size(); - deletedFiles.removeIf(f -> !currentFiles.contains(f)); - if (deletedFiles.size() != origCount) { - LOG.warn("Some Files to be deleted as part of " + operation + " at " + instantTime + " were not found in the " - + " metadata for partition " + partition - + ". To delete = " + origCount + ", found=" + deletedFiles.size()); - } - - fileChangeCount[0] += deletedFiles.size(); - - Option<Map<String, Long>> filesAdded = Option.empty(); - if (partitionToAppendedFiles.containsKey(partition)) { - filesAdded = Option.of(partitionToAppendedFiles.remove(partition)); - } - - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded, - Option.of(new ArrayList<>(deletedFiles))); - records.add(record); - } catch (IOException e) { - throw new HoodieMetadataException("Failed to commit rollback deletes at instant " + instantTime, e); - } - }); - - partitionToAppendedFiles.forEach((partition, appendedFileMap) -> { - fileChangeCount[1] += appendedFileMap.size(); - - // Validate that no appended file has been deleted - ValidationUtils.checkState( - !appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition, Collections.emptyList())), - "Rollback file cannot both be appended and deleted"); - - // New files added to a partition - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(appendedFileMap), - Option.empty()); - records.add(record); - }); - - LOG.info("Updating at " + instantTime + " from " + operation + ". #partitions_updated=" + records.size() - + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + fileChangeCount[1]); - commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index b268512..d56e6e7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -277,7 +277,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem private SyncableFileSystemView getFileSystemViewInternal(HoodieTimeline timeline) { if (config.useFileListingMetadata()) { FileSystemViewStorageConfig viewConfig = config.getViewStorageConfig(); - return new HoodieMetadataFileSystemView(metaClient, this.metadata, timeline, viewConfig.isIncrementalTimelineSyncEnabled()); + return new HoodieMetadataFileSystemView(metaClient, this.metadata(), timeline, viewConfig.isIncrementalTimelineSyncEnabled()); } else { return getViewManager().getFileSystemView(metaClient); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java index 03328dd..e59a950 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java @@ -37,6 +37,7 @@ import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -70,6 +71,11 @@ public class TestCompactionAdminClient extends HoodieClientTestBase { client = new CompactionAdminClient(context, basePath); } + @AfterEach + public void cleanUp() throws Exception { + cleanupResources(); + } + @Test public void testUnscheduleCompactionPlan() throws Exception { int numEntriesPerInstant = 10; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java similarity index 87% rename from hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java index b9c3511..313eda2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java @@ -79,8 +79,8 @@ import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -public class TestHoodieFsMetadata extends HoodieClientTestHarness { - private static final Logger LOG = LogManager.getLogger(TestHoodieFsMetadata.class); +public class TestHoodieBackedMetadata extends HoodieClientTestHarness { + private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class); @TempDir public java.nio.file.Path tempFolder; @@ -95,7 +95,7 @@ public class TestHoodieFsMetadata extends HoodieClientTestHarness { initSparkContexts("TestHoodieMetadata"); initFileSystem(); fs.mkdirs(new Path(basePath)); - initMetaClient(); + initMetaClient(tableType); initTestDataGenerator(); metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); @@ -371,7 +371,41 @@ public class TestHoodieFsMetadata extends HoodieClientTestHarness { client.syncTableMetadata(); validateMetadata(client); } + } + + /** + * Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op + * occurs to metadata. + * @throws Exception + */ + @Test + public void testRollbackUnsyncedCommit() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { + // Initialize table with metadata + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20); + client.startCommitWithTime(newCommitTime); + List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + validateMetadata(client); + } + + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { + // Commit with metadata disabled + client.startCommitWithTime(newCommitTime); + List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10); + List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + client.rollback(newCommitTime); + } + try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true))) { + validateMetadata(client); + } } /** @@ -637,14 +671,93 @@ public class TestHoodieFsMetadata extends HoodieClientTestHarness { } /** + * Test when reading from metadata table which is out of sync with dataset that results are still consistent. + */ + // @ParameterizedTest + // @EnumSource(HoodieTableType.class) + @Test + public void testMetadataOutOfSync() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + SparkRDDWriteClient unsyncedClient = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true)); + + // Enable metadata so table is initialized + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { + // Perform Bulk Insert + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20); + client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); + } + + // Perform commit operations with metadata disabled + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { + // Perform Insert + String newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20); + client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); + + // Perform Upsert + newCommitTime = "003"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUniqueUpdates(newCommitTime, 20); + client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + + // Compaction + if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { + newCommitTime = "004"; + client.scheduleCompactionAtInstant(newCommitTime, Option.empty()); + client.compact(newCommitTime); + } + } + + assertFalse(metadata(unsyncedClient).isInSync()); + validateMetadata(unsyncedClient); + + // Perform clean operation with metadata disabled + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { + // One more commit needed to trigger clean so upsert and compact + String newCommitTime = "005"; + client.startCommitWithTime(newCommitTime); + List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 20); + client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + + if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { + newCommitTime = "006"; + client.scheduleCompactionAtInstant(newCommitTime, Option.empty()); + client.compact(newCommitTime); + } + + // Clean + newCommitTime = "007"; + client.clean(newCommitTime); + } + + assertFalse(metadata(unsyncedClient).isInSync()); + validateMetadata(unsyncedClient); + + // Perform restore with metadata disabled + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { + client.restoreToInstant("004"); + } + + assertFalse(metadata(unsyncedClient).isInSync()); + validateMetadata(unsyncedClient); + } + + + /** * Validate the metadata tables contents to ensure it matches what is on the file system. * * @throws IOException */ private void validateMetadata(SparkRDDWriteClient client) throws IOException { HoodieWriteConfig config = client.getConfig(); - HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client); - assertNotNull(metadataWriter, "MetadataWriter should have been initialized"); + + HoodieBackedTableMetadata tableMetadata = metadata(client); + assertNotNull(tableMetadata, "MetadataReader should have been initialized"); if (!config.useFileListingMetadata()) { return; } @@ -652,17 +765,9 @@ public class TestHoodieFsMetadata extends HoodieClientTestHarness { HoodieTimer timer = new HoodieTimer().startTimer(); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - // Validate write config for metadata table - HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig(); - assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata table for metadata table"); - assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table"); - - // Metadata table should be in sync with the dataset - assertTrue(metadata(client).isInSync()); - // Partitions should match List<String> fsPartitions = FSUtils.getAllFoldersWithPartitionMetaFile(fs, basePath); - List<String> metadataPartitions = metadataWriter.metadata().getAllPartitionPaths(); + List<String> metadataPartitions = tableMetadata.getAllPartitionPaths(); Collections.sort(fsPartitions); Collections.sort(metadataPartitions); @@ -684,7 +789,7 @@ public class TestHoodieFsMetadata extends HoodieClientTestHarness { partitionPath = new Path(basePath, partition); } FileStatus[] fsStatuses = FSUtils.getAllDataFilesInPartition(fs, partitionPath); - FileStatus[] metaStatuses = metadataWriter.metadata().getAllFilesInPartition(partitionPath); + FileStatus[] metaStatuses = tableMetadata.getAllFilesInPartition(partitionPath); List<String> fsFileNames = Arrays.stream(fsStatuses) .map(s -> s.getPath().getName()).collect(Collectors.toList()); List<String> metadataFilenames = Arrays.stream(metaStatuses) @@ -705,9 +810,9 @@ public class TestHoodieFsMetadata extends HoodieClientTestHarness { // FileSystemView should expose the same data List<HoodieFileGroup> fileGroups = tableView.getAllFileGroups(partition).collect(Collectors.toList()); - fileGroups.forEach(g -> LogManager.getLogger(TestHoodieFsMetadata.class).info(g)); - fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(TestHoodieFsMetadata.class).info(b))); - fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> LogManager.getLogger(TestHoodieFsMetadata.class).info(s))); + fileGroups.forEach(g -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(g)); + fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(b))); + fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(s))); long numFiles = fileGroups.stream() .mapToLong(g -> g.getAllBaseFiles().count() + g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum()) @@ -720,10 +825,17 @@ public class TestHoodieFsMetadata extends HoodieClientTestHarness { } }); - HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath); + HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client); + assertNotNull(metadataWriter, "MetadataWriter should have been initialized"); + + // Validate write config for metadata table + HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig(); + assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata table for metadata table"); + assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table"); // Metadata table should be in sync with the dataset - assertTrue(metadataWriter.metadata().isInSync()); + assertTrue(metadata(client).isInSync()); + HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath); // Metadata table is MOR assertEquals(metadataMetaClient.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java index b8e02b9..6a292f5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -46,6 +46,7 @@ import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -91,6 +92,11 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { initDFSMetaClient(); } + @AfterEach + public void cleanUp() throws Exception { + cleanupResources(); + } + @Test public void testLeftOverUpdatedPropFileCleanup() throws IOException { testUpgradeInternal(true, true, HoodieTableType.MERGE_ON_READ); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 9fa1c47..e6523af 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -24,6 +24,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; @@ -204,6 +205,10 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im * @throws IOException */ protected void initMetaClient() throws IOException { + initMetaClient(getTableType()); + } + + protected void initMetaClient(HoodieTableType tableType) throws IOException { if (basePath == null) { throw new IllegalStateException("The base path has not been initialized."); } @@ -212,7 +217,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im throw new IllegalStateException("The Spark context has not been initialized."); } - metaClient = HoodieTestUtils.init(context.getHadoopConf().get(), basePath, getTableType()); + metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java similarity index 50% copy from hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java copy to hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 4858e6e..f62d9d8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -1,3 +1,4 @@ + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,105 +19,64 @@ package org.apache.hudi.metadata; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.metrics.Registry; -import org.apache.hudi.common.model.FileSlice; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.SpillableMapUtils; -import org.apache.hudi.common.util.ValidationUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + import org.apache.hudi.exception.HoodieMetadataException; -import org.apache.hudi.exception.TableNotFoundException; -import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -/** - * Table metadata provided by an internal DFS backed Hudi metadata table. - * - * If the metadata table does not exist, RPC calls are used to retrieve file listings from the file system. - * No updates are applied to the table and it is not synced. - */ -public class HoodieBackedTableMetadata implements HoodieTableMetadata { +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; - private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadata.class); - private static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024; - private static final int BUFFER_SIZE = 10 * 1024 * 1024; +public abstract class BaseTableMetadata implements HoodieTableMetadata { - private final SerializableConfiguration hadoopConf; - private final String datasetBasePath; - private final String metadataBasePath; - private final Option<HoodieMetadataMetrics> metrics; - private HoodieTableMetaClient metaClient; + private static final Logger LOG = LogManager.getLogger(BaseTableMetadata.class); + + static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024; + static final int BUFFER_SIZE = 10 * 1024 * 1024; + + protected final SerializableConfiguration hadoopConf; + protected final String datasetBasePath; + protected boolean enabled; + protected final Option<HoodieMetadataMetrics> metrics; - private boolean enabled; private final boolean validateLookups; private final boolean assumeDatePartitioning; - // Directory used for Spillable Map when merging records - private final String spillableMapDirectory; - // Readers for the base and log file which store the metadata - private transient HoodieFileReader<GenericRecord> baseFileReader; - private transient HoodieMetadataMergedLogRecordScanner logRecordScanner; - - public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory, - boolean enabled, boolean validateLookups, boolean assumeDatePartitioning) { - this(conf, datasetBasePath, spillableMapDirectory, enabled, validateLookups, false, assumeDatePartitioning); - } + // Directory used for Spillable Map when merging records + protected final String spillableMapDirectory; + private transient HoodieMetadataMergedInstantRecordScanner timelineRecordScanner; - public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory, - boolean enabled, boolean validateLookups, boolean enableMetrics, - boolean assumeDatePartitioning) { - this.hadoopConf = new SerializableConfiguration(conf); + protected BaseTableMetadata(Configuration hadoopConf, String datasetBasePath, String spillableMapDirectory, + boolean enabled, boolean validateLookups, boolean enableMetrics, + boolean assumeDatePartitioning) { + this.hadoopConf = new SerializableConfiguration(hadoopConf); this.datasetBasePath = datasetBasePath; - this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath); - this.validateLookups = validateLookups; this.spillableMapDirectory = spillableMapDirectory; + this.enabled = enabled; + this.validateLookups = validateLookups; this.assumeDatePartitioning = assumeDatePartitioning; - if (enabled) { - try { - this.metaClient = new HoodieTableMetaClient(hadoopConf.get(), metadataBasePath); - } catch (TableNotFoundException e) { - LOG.warn("Metadata table was not found at path " + metadataBasePath); - this.enabled = false; - } catch (Exception e) { - LOG.error("Failed to initialize metadata table at path " + metadataBasePath, e); - this.enabled = false; - } - } else { - LOG.info("Metadata table is disabled."); - } - if (enableMetrics) { this.metrics = Option.of(new HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata"))); } else { @@ -134,8 +94,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { * */ @Override - public List<String> getAllPartitionPaths() - throws IOException { + public List<String> getAllPartitionPaths() throws IOException { if (enabled) { try { return fetchAllPartitionPaths(); @@ -163,7 +122,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { try { return fetchAllFilesInPartition(partitionPath); } catch (Exception e) { - LOG.error("Failed to retrive files in partition " + partitionPath + " from metadata", e); + LOG.error("Failed to retrieve files in partition " + partitionPath + " from metadata", e); } } @@ -247,6 +206,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { timer.startTimer(); // Ignore partition metadata file + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath); FileStatus[] directStatuses = metaClient.getFs().listStatus(partitionPath, p -> !p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer())); @@ -281,165 +241,53 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { * @param key The key of the record */ private Option<HoodieRecord<HoodieMetadataPayload>> getMergedRecordByKey(String key) throws IOException { - openBaseAndLogFiles(); - - // Retrieve record from base file - HoodieRecord<HoodieMetadataPayload> hoodieRecord = null; - if (baseFileReader != null) { - HoodieTimer timer = new HoodieTimer().startTimer(); - Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key); - if (baseRecord.isPresent()) { - hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), - metaClient.getTableConfig().getPayloadClass()); - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, timer.endTimer())); - } - } - // Retrieve record from log file - Option<HoodieRecord<HoodieMetadataPayload>> logHoodieRecord = logRecordScanner.getRecordByKey(key); - if (logHoodieRecord.isPresent()) { - if (hoodieRecord != null) { - // Merge the payloads - HoodieRecordPayload mergedPayload = logHoodieRecord.get().getData().preCombine(hoodieRecord.getData()); - hoodieRecord = new HoodieRecord(hoodieRecord.getKey(), mergedPayload); + Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord; + openTimelineScanner(); + + Option<HoodieRecord<HoodieMetadataPayload>> metadataHoodieRecord = getRecordByKeyFromMetadata(key); + // Retrieve record from unsynced timeline instants + Option<HoodieRecord<HoodieMetadataPayload>> timelineHoodieRecord = timelineRecordScanner.getRecordByKey(key); + if (timelineHoodieRecord.isPresent()) { + if (metadataHoodieRecord.isPresent()) { + HoodieRecordPayload mergedPayload = timelineHoodieRecord.get().getData().preCombine(metadataHoodieRecord.get().getData()); + mergedRecord = Option.of(new HoodieRecord(metadataHoodieRecord.get().getKey(), mergedPayload)); } else { - hoodieRecord = logHoodieRecord.get(); + mergedRecord = timelineHoodieRecord; } + } else { + mergedRecord = metadataHoodieRecord; } - - return Option.ofNullable(hoodieRecord); + return mergedRecord; } - /** - * Open readers to the base and log files. - */ - private synchronized void openBaseAndLogFiles() throws IOException { - if (logRecordScanner != null) { + protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key) throws IOException; + + private void openTimelineScanner() throws IOException { + if (timelineRecordScanner != null) { // Already opened return; } - HoodieTimer timer = new HoodieTimer().startTimer(); - - // Metadata is in sync till the latest completed instant on the dataset HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath); - String latestInstantTime = datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant() - .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); - - // Find the latest file slice - HoodieTimeline timeline = metaClient.reloadActiveTimeline(); - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline()); - List<FileSlice> latestSlices = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList()); - ValidationUtils.checkArgument(latestSlices.size() == 1); - - // If the base file is present then create a reader - Option<HoodieBaseFile> basefile = latestSlices.get(0).getBaseFile(); - if (basefile.isPresent()) { - String basefilePath = basefile.get().getPath(); - baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath)); - LOG.info("Opened metadata base file from " + basefilePath + " at instant " + basefile.get().getCommitTime()); - } - - // Open the log record scanner using the log files from the latest file slice - List<String> logFilePaths = latestSlices.get(0).getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) - .map(o -> o.getPath().toString()) - .collect(Collectors.toList()); - - Option<HoodieInstant> lastInstant = timeline.filterCompletedInstants().lastInstant(); - String latestMetaInstantTimestamp = lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); - - // Load the schema + List<HoodieInstant> unsyncedInstants = findInstantsToSync(datasetMetaClient); Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); - - logRecordScanner = - new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), metadataBasePath, - logFilePaths, schema, latestMetaInstantTimestamp, MAX_MEMORY_SIZE_IN_BYTES, BUFFER_SIZE, - spillableMapDirectory, null); - - LOG.info("Opened metadata log files from " + logFilePaths + " at instant " + latestInstantTime - + "(dataset instant=" + latestInstantTime + ", metadata instant=" + latestMetaInstantTimestamp + ")"); - - metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, timer.endTimer())); - } - - public void closeReaders() { - if (baseFileReader != null) { - baseFileReader.close(); - baseFileReader = null; - } - logRecordScanner = null; + timelineRecordScanner = + new HoodieMetadataMergedInstantRecordScanner(datasetMetaClient, unsyncedInstants, getSyncedInstantTime(), schema, MAX_MEMORY_SIZE_IN_BYTES, spillableMapDirectory, null); } - /** - * Return {@code True} if all Instants from the dataset have been synced with the Metadata Table. - */ - @Override - public boolean isInSync() { - return enabled && findInstantsToSync().isEmpty(); - } - - private List<HoodieInstant> findInstantsToSync() { + protected List<HoodieInstant> findInstantsToSync() { HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath); return findInstantsToSync(datasetMetaClient); } - /** - * Return an ordered list of instants which have not been synced to the Metadata Table. - * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset - */ - protected List<HoodieInstant> findInstantsToSync(HoodieTableMetaClient datasetMetaClient) { - HoodieActiveTimeline metaTimeline = metaClient.reloadActiveTimeline(); - - // All instants on the data timeline, which are greater than the last instant on metadata timeline - // are candidates for sync. - Option<HoodieInstant> latestMetadataInstant = metaTimeline.filterCompletedInstants().lastInstant(); - ValidationUtils.checkArgument(latestMetadataInstant.isPresent(), - "At least one completed instant should exist on the metadata table, before syncing."); - String latestMetadataInstantTime = latestMetadataInstant.get().getTimestamp(); - HoodieDefaultTimeline candidateTimeline = datasetMetaClient.getActiveTimeline().findInstantsAfter(latestMetadataInstantTime, Integer.MAX_VALUE); - Option<HoodieInstant> earliestIncompleteInstant = candidateTimeline.filterInflightsAndRequested().firstInstant(); - - if (earliestIncompleteInstant.isPresent()) { - return candidateTimeline.filterCompletedInstants() - .findInstantsBefore(earliestIncompleteInstant.get().getTimestamp()) - .getInstants().collect(Collectors.toList()); - } else { - return candidateTimeline.filterCompletedInstants() - .getInstants().collect(Collectors.toList()); - } - } - - /** - * Return the timestamp of the latest compaction instant. - */ - @Override - public Option<String> getSyncedInstantTime() { - if (!enabled) { - return Option.empty(); - } + protected abstract List<HoodieInstant> findInstantsToSync(HoodieTableMetaClient datasetMetaClient); - HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); - return timeline.getDeltaCommitTimeline().filterCompletedInstants() - .lastInstant().map(HoodieInstant::getTimestamp); - } - - public boolean enabled() { - return enabled; - } - - public SerializableConfiguration getHadoopConf() { - return hadoopConf; - } - - public String getDatasetBasePath() { - return datasetBasePath; - } - - public HoodieTableMetaClient getMetaClient() { - return metaClient; + public boolean isInSync() { + return enabled && findInstantsToSync().isEmpty(); } - public Map<String, String> stats() { - return metrics.map(m -> m.getStats(true, metaClient, this)).orElse(new HashMap<>()); + protected void closeReaders() { + timelineRecordScanner = null; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 4858e6e..65c3244 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -18,28 +18,12 @@ package org.apache.hudi.metadata; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.common.config.SerializableConfiguration; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -52,37 +36,36 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + /** * Table metadata provided by an internal DFS backed Hudi metadata table. * * If the metadata table does not exist, RPC calls are used to retrieve file listings from the file system. * No updates are applied to the table and it is not synced. */ -public class HoodieBackedTableMetadata implements HoodieTableMetadata { +public class HoodieBackedTableMetadata extends BaseTableMetadata { private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadata.class); - private static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024; - private static final int BUFFER_SIZE = 10 * 1024 * 1024; - private final SerializableConfiguration hadoopConf; - private final String datasetBasePath; private final String metadataBasePath; - private final Option<HoodieMetadataMetrics> metrics; private HoodieTableMetaClient metaClient; - private boolean enabled; - private final boolean validateLookups; - private final boolean assumeDatePartitioning; - // Directory used for Spillable Map when merging records - private final String spillableMapDirectory; - // Readers for the base and log file which store the metadata private transient HoodieFileReader<GenericRecord> baseFileReader; private transient HoodieMetadataMergedLogRecordScanner logRecordScanner; @@ -95,14 +78,8 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory, boolean enabled, boolean validateLookups, boolean enableMetrics, boolean assumeDatePartitioning) { - this.hadoopConf = new SerializableConfiguration(conf); - this.datasetBasePath = datasetBasePath; + super(conf, datasetBasePath, spillableMapDirectory, enabled, validateLookups, enableMetrics, assumeDatePartitioning); this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath); - this.validateLookups = validateLookups; - this.spillableMapDirectory = spillableMapDirectory; - this.enabled = enabled; - this.assumeDatePartitioning = assumeDatePartitioning; - if (enabled) { try { this.metaClient = new HoodieTableMetaClient(hadoopConf.get(), metadataBasePath); @@ -116,171 +93,10 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { } else { LOG.info("Metadata table is disabled."); } - - if (enableMetrics) { - this.metrics = Option.of(new HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata"))); - } else { - this.metrics = Option.empty(); - } - } - - /** - * Return the list of partitions in the dataset. - * - * If the Metadata Table is enabled, the listing is retrieved from the stored metadata. Otherwise, the list of - * partitions is retrieved directly from the underlying {@code FileSystem}. - * - * On any errors retrieving the listing from the metadata, defaults to using the file system listings. - * - */ - @Override - public List<String> getAllPartitionPaths() - throws IOException { - if (enabled) { - try { - return fetchAllPartitionPaths(); - } catch (Exception e) { - LOG.error("Failed to retrieve list of partition from metadata", e); - } - } - return new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, assumeDatePartitioning).getAllPartitionPaths(); } - /** - * Return the list of files in a partition. - * - * If the Metadata Table is enabled, the listing is retrieved from the stored metadata. Otherwise, the list of - * partitions is retrieved directly from the underlying {@code FileSystem}. - * - * On any errors retrieving the listing from the metadata, defaults to using the file system listings. - * - * @param partitionPath The absolute path of the partition to list - */ @Override - public FileStatus[] getAllFilesInPartition(Path partitionPath) - throws IOException { - if (enabled) { - try { - return fetchAllFilesInPartition(partitionPath); - } catch (Exception e) { - LOG.error("Failed to retrive files in partition " + partitionPath + " from metadata", e); - } - } - - return FSUtils.getFs(partitionPath.toString(), hadoopConf.get()).listStatus(partitionPath); - } - - /** - * Returns a list of all partitions. - */ - protected List<String> fetchAllPartitionPaths() throws IOException { - HoodieTimer timer = new HoodieTimer().startTimer(); - Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getMergedRecordByKey(RECORDKEY_PARTITION_LIST); - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer())); - - List<String> partitions = Collections.emptyList(); - if (hoodieRecord.isPresent()) { - if (!hoodieRecord.get().getData().getDeletions().isEmpty()) { - throw new HoodieMetadataException("Metadata partition list record is inconsistent: " - + hoodieRecord.get().getData()); - } - - partitions = hoodieRecord.get().getData().getFilenames(); - // Partition-less tables have a single empty partition - if (partitions.contains(NON_PARTITIONED_NAME)) { - partitions.remove(NON_PARTITIONED_NAME); - partitions.add(""); - } - } - - if (validateLookups) { - // Validate the Metadata Table data by listing the partitions from the file system - timer.startTimer(); - FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, assumeDatePartitioning); - List<String> actualPartitions = fileSystemBackedTableMetadata.getAllPartitionPaths(); - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR, timer.endTimer())); - - Collections.sort(actualPartitions); - Collections.sort(partitions); - if (!actualPartitions.equals(partitions)) { - LOG.error("Validation of metadata partition list failed. Lists do not match."); - LOG.error("Partitions from metadata: " + Arrays.toString(partitions.toArray())); - LOG.error("Partitions from file system: " + Arrays.toString(actualPartitions.toArray())); - - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0)); - } - - // Return the direct listing as it should be correct - partitions = actualPartitions; - } - - LOG.info("Listed partitions from metadata: #partitions=" + partitions.size()); - return partitions; - } - - /** - * Return all the files from the partition. - * - * @param partitionPath The absolute path of the partition - */ - FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException { - String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), partitionPath); - if (partitionName.isEmpty()) { - partitionName = NON_PARTITIONED_NAME; - } - - HoodieTimer timer = new HoodieTimer().startTimer(); - Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getMergedRecordByKey(partitionName); - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer())); - - FileStatus[] statuses = {}; - if (hoodieRecord.isPresent()) { - if (!hoodieRecord.get().getData().getDeletions().isEmpty()) { - throw new HoodieMetadataException("Metadata record for partition " + partitionName + " is inconsistent: " - + hoodieRecord.get().getData()); - } - statuses = hoodieRecord.get().getData().getFileStatuses(partitionPath); - } - - if (validateLookups) { - // Validate the Metadata Table data by listing the partitions from the file system - timer.startTimer(); - - // Ignore partition metadata file - FileStatus[] directStatuses = metaClient.getFs().listStatus(partitionPath, - p -> !p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer())); - - List<String> directFilenames = Arrays.stream(directStatuses) - .map(s -> s.getPath().getName()).sorted() - .collect(Collectors.toList()); - - List<String> metadataFilenames = Arrays.stream(statuses) - .map(s -> s.getPath().getName()).sorted() - .collect(Collectors.toList()); - - if (!metadataFilenames.equals(directFilenames)) { - LOG.error("Validation of metadata file listing for partition " + partitionName + " failed."); - LOG.error("File list from metadata: " + Arrays.toString(metadataFilenames.toArray())); - LOG.error("File list from direct listing: " + Arrays.toString(directFilenames.toArray())); - - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0)); - } - - // Return the direct listing as it should be correct - statuses = directStatuses; - } - - LOG.info("Listed file in partition from metadata: partition=" + partitionName + ", #files=" + statuses.length); - return statuses; - } - - /** - * Retrieve the merged {@code HoodieRecord} mapped to the given key. - * - * @param key The key of the record - */ - private Option<HoodieRecord<HoodieMetadataPayload>> getMergedRecordByKey(String key) throws IOException { + protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key) throws IOException { openBaseAndLogFiles(); // Retrieve record from base file @@ -313,7 +129,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { /** * Open readers to the base and log files. */ - private synchronized void openBaseAndLogFiles() throws IOException { + protected synchronized void openBaseAndLogFiles() throws IOException { if (logRecordScanner != null) { // Already opened return; @@ -371,19 +187,6 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { } /** - * Return {@code True} if all Instants from the dataset have been synced with the Metadata Table. - */ - @Override - public boolean isInSync() { - return enabled && findInstantsToSync().isEmpty(); - } - - private List<HoodieInstant> findInstantsToSync() { - HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath); - return findInstantsToSync(datasetMetaClient); - } - - /** * Return an ordered list of instants which have not been synced to the Metadata Table. * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset */ diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java new file mode 100644 index 0000000..1dcd322 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.avro.Schema; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.DefaultSizeEstimator; +import org.apache.hudi.common.util.HoodieRecordSizeEstimator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +/** + * Provides functionality to convert timeline instants to table metadata records and then merge by key. Specify + * a filter to limit keys that are merged and stored in memory. + */ +public class HoodieMetadataMergedInstantRecordScanner { + + private static final Logger LOG = LogManager.getLogger(HoodieMetadataMergedInstantRecordScanner.class); + + HoodieTableMetaClient metaClient; + private List<HoodieInstant> instants; + private Option<String> lastSyncTs; + private Set<String> mergeKeyFilter; + protected final ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records; + + public HoodieMetadataMergedInstantRecordScanner(HoodieTableMetaClient metaClient, List<HoodieInstant> instants, + Option<String> lastSyncTs, Schema readerSchema, Long maxMemorySizeInBytes, + String spillableMapBasePath, Set<String> mergeKeyFilter) throws IOException { + this.metaClient = metaClient; + this.instants = instants; + this.lastSyncTs = lastSyncTs; + this.mergeKeyFilter = mergeKeyFilter != null ? mergeKeyFilter : Collections.emptySet(); + this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(), + new HoodieRecordSizeEstimator(readerSchema)); + + scan(); + } + + /** + * Converts instants in scanner to metadata table records and processes each record. + * + * @param + * @throws IOException + */ + private void scan() { + for (HoodieInstant instant : instants) { + try { + Option<List<HoodieRecord>> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(metaClient, instant, lastSyncTs); + if (records.isPresent()) { + records.get().forEach(record -> processNextRecord(record)); + } + } catch (Exception e) { + LOG.error(String.format("Got exception when processing timeline instant %s", instant.getTimestamp()), e); + throw new HoodieException(String.format("Got exception when processing timeline instant %s", instant.getTimestamp()), e); + } + } + } + + /** + * Process metadata table record by merging with existing record if it is a part of the key filter. + * + * @param hoodieRecord + */ + private void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) { + String key = hoodieRecord.getRecordKey(); + if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(key)) { + if (records.containsKey(key)) { + // Merge and store the merged record + HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData()); + records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue)); + } else { + // Put the record as is + records.put(key, hoodieRecord); + } + } + } + + /** + * Retrieve merged hoodie record for given key. + * + * @param key of the record to retrieve + * @return {@code HoodieRecord} if key was found else {@code Option.empty()} + */ + public Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key) { + return Option.ofNullable((HoodieRecord) records.get(key)); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java new file mode 100644 index 0000000..3017e82 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.CleanerUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; + +/** + * A utility to convert timeline information to metadata table records. + */ +public class HoodieTableMetadataUtil { + + private static final Logger LOG = LogManager.getLogger(HoodieTableMetadataUtil.class); + + /** + * Converts a timeline instant to metadata table records. + * + * @param datasetMetaClient The meta client associated with the timeline instant + * @param instant to fetch and convert to metadata table records + * @return a list of metadata table records + * @throws IOException + */ + public static Option<List<HoodieRecord>> convertInstantToMetaRecords(HoodieTableMetaClient datasetMetaClient, HoodieInstant instant, Option<String> lastSyncTs) throws IOException { + HoodieTimeline timeline = datasetMetaClient.getActiveTimeline(); + Option<List<HoodieRecord>> records = Option.empty(); + ValidationUtils.checkArgument(instant.isCompleted(), "Only completed instants can be synced."); + + switch (instant.getAction()) { + case HoodieTimeline.CLEAN_ACTION: + HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(datasetMetaClient, instant); + records = Option.of(convertMetadataToRecords(cleanMetadata, instant.getTimestamp())); + break; + case HoodieTimeline.DELTA_COMMIT_ACTION: + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.COMPACTION_ACTION: + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + records = Option.of(convertMetadataToRecords(commitMetadata, instant.getTimestamp())); + break; + case HoodieTimeline.ROLLBACK_ACTION: + HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata( + timeline.getInstantDetails(instant).get()); + records = Option.of(convertMetadataToRecords(rollbackMetadata, instant.getTimestamp(), lastSyncTs)); + break; + case HoodieTimeline.RESTORE_ACTION: + HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata( + timeline.getInstantDetails(instant).get()); + records = Option.of(convertMetadataToRecords(restoreMetadata, instant.getTimestamp(), lastSyncTs)); + break; + case HoodieTimeline.SAVEPOINT_ACTION: + // Nothing to be done here + break; + default: + throw new HoodieException("Unknown type of action " + instant.getAction()); + } + + return records; + } + + /** + * Finds all new files/partitions created as part of commit and creates metadata table records for them. + * + * @param commitMetadata + * @param instantTime + * @return a list of metadata table records + */ + public static List<HoodieRecord> convertMetadataToRecords(HoodieCommitMetadata commitMetadata, String instantTime) { + List<HoodieRecord> records = new LinkedList<>(); + List<String> allPartitions = new LinkedList<>(); + commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> { + final String partition = partitionStatName.equals("") ? NON_PARTITIONED_NAME : partitionStatName; + allPartitions.add(partition); + + Map<String, Long> newFiles = new HashMap<>(writeStats.size()); + writeStats.forEach(hoodieWriteStat -> { + String pathWithPartition = hoodieWriteStat.getPath(); + if (pathWithPartition == null) { + // Empty partition + LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat); + return; + } + + int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1; + String filename = pathWithPartition.substring(offset); + ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate files in HoodieCommitMetadata"); + newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes()); + }); + + // New files added to a partition + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord( + partition, Option.of(newFiles), Option.empty()); + records.add(record); + }); + + // New partitions created + HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new ArrayList<>(allPartitions)); + records.add(record); + + LOG.info("Updating at " + instantTime + " from Commit/" + commitMetadata.getOperationType() + + ". #partitions_updated=" + records.size()); + return records; + } + + /** + * Finds all files that will be deleted as part of a planned clean and creates metadata table records for them. + * + * @param cleanerPlan from timeline to convert + * @param instantTime + * @return a list of metadata table records + */ + public static List<HoodieRecord> convertMetadataToRecords(HoodieCleanerPlan cleanerPlan, String instantTime) { + List<HoodieRecord> records = new LinkedList<>(); + + int[] fileDeleteCount = {0}; + cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition, deletedPathInfo) -> { + fileDeleteCount[0] += deletedPathInfo.size(); + + // Files deleted from a partition + List<String> deletedFilenames = deletedPathInfo.stream().map(p -> new Path(p.getFilePath()).getName()) + .collect(Collectors.toList()); + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), + Option.of(deletedFilenames)); + records.add(record); + }); + + LOG.info("Found at " + instantTime + " from CleanerPlan. #partitions_updated=" + records.size() + + ", #files_deleted=" + fileDeleteCount[0]); + return records; + } + + /** + * Finds all files that were deleted as part of a clean and creates metadata table records for them. + * + * @param cleanMetadata + * @param instantTime + * @return a list of metadata table records + */ + public static List<HoodieRecord> convertMetadataToRecords(HoodieCleanMetadata cleanMetadata, String instantTime) { + List<HoodieRecord> records = new LinkedList<>(); + int[] fileDeleteCount = {0}; + + cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { + // Files deleted from a partition + List<String> deletedFiles = partitionMetadata.getSuccessDeleteFiles(); + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), + Option.of(new ArrayList<>(deletedFiles))); + + records.add(record); + fileDeleteCount[0] += deletedFiles.size(); + }); + + LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size() + + ", #files_deleted=" + fileDeleteCount[0]); + return records; + } + + /** + * Aggregates all files deleted and appended to from all rollbacks associated with a restore operation then + * creates metadata table records for them. + * + * @param restoreMetadata + * @param instantTime + * @return a list of metadata table records + */ + public static List<HoodieRecord> convertMetadataToRecords(HoodieRestoreMetadata restoreMetadata, String instantTime, Option<String> lastSyncTs) { + Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>(); + Map<String, List<String>> partitionToDeletedFiles = new HashMap<>(); + restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> { + rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs)); + }); + + return convertFilesToRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore"); + } + + public static List<HoodieRecord> convertMetadataToRecords(HoodieRollbackMetadata rollbackMetadata, String instantTime, Option<String> lastSyncTs) { + + Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>(); + Map<String, List<String>> partitionToDeletedFiles = new HashMap<>(); + processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs); + return convertFilesToRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Rollback"); + } + + /** + * Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}. + * + * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This + * function will extract this change file for each partition. + * + * @param rollbackMetadata {@code HoodieRollbackMetadata} + * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition. + * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes. + */ + private static void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata, + Map<String, List<String>> partitionToDeletedFiles, + Map<String, Map<String, Long>> partitionToAppendedFiles, + Option<String> lastSyncTs) { + + rollbackMetadata.getPartitionMetadata().values().forEach(pm -> { + // If commit being rolled back has not been synced to metadata table yet then there is no need to update metadata + if (lastSyncTs.isPresent() && HoodieTimeline.compareTimestamps(rollbackMetadata.getCommitsRollback().get(0), HoodieTimeline.GREATER_THAN, lastSyncTs.get())) { + return; + } + + final String partition = pm.getPartitionPath(); + if (!pm.getSuccessDeleteFiles().isEmpty()) { + if (!partitionToDeletedFiles.containsKey(partition)) { + partitionToDeletedFiles.put(partition, new ArrayList<>()); + } + + // Extract deleted file name from the absolute paths saved in getSuccessDeleteFiles() + List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p -> new Path(p).getName()) + .collect(Collectors.toList()); + partitionToDeletedFiles.get(partition).addAll(deletedFiles); + } + + if (!pm.getAppendFiles().isEmpty()) { + if (!partitionToAppendedFiles.containsKey(partition)) { + partitionToAppendedFiles.put(partition, new HashMap<>()); + } + + // Extract appended file name from the absolute paths saved in getAppendFiles() + pm.getAppendFiles().forEach((path, size) -> { + partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, (oldSize, newSizeCopy) -> { + return size + oldSize; + }); + }); + } + }); + } + + private static List<HoodieRecord> convertFilesToRecords(Map<String, List<String>> partitionToDeletedFiles, + Map<String, Map<String, Long>> partitionToAppendedFiles, String instantTime, + String operation) { + List<HoodieRecord> records = new LinkedList<>(); + int[] fileChangeCount = {0, 0}; // deletes, appends + + partitionToDeletedFiles.forEach((partition, deletedFiles) -> { + fileChangeCount[0] += deletedFiles.size(); + + Option<Map<String, Long>> filesAdded = Option.empty(); + if (partitionToAppendedFiles.containsKey(partition)) { + filesAdded = Option.of(partitionToAppendedFiles.remove(partition)); + } + + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded, + Option.of(new ArrayList<>(deletedFiles))); + records.add(record); + }); + + partitionToAppendedFiles.forEach((partition, appendedFileMap) -> { + fileChangeCount[1] += appendedFileMap.size(); + + // Validate that no appended file has been deleted + ValidationUtils.checkState( + !appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition, Collections.emptyList())), + "Rollback file cannot both be appended and deleted"); + + // New files added to a partition + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(appendedFileMap), + Option.empty()); + records.add(record); + }); + + LOG.info("Found at " + instantTime + " from " + operation + ". #partitions_updated=" + records.size() + + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + fileChangeCount[1]); + + return records; + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index c3843cc..f315a26 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -86,7 +86,9 @@ class TestCOWDataSource extends HoodieClientTestBase { } @ParameterizedTest - @ValueSource(booleans = Array(true, false)) + //TODO(metadata): Needs HUDI-1459 to be fixed + //@ValueSource(booleans = Array(true, false)) + @ValueSource(booleans = Array(false)) def testCopyOnWriteStorage(isMetadataEnabled: Boolean) { // Insert Operation val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
