This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4e642268442782cdd7ad753981dd2571388cd189 Author: Udit Mehrotra <[email protected]> AuthorDate: Thu Dec 31 01:20:02 2020 -0800 [HUDI-1450] Use metadata table for listing in HoodieROTablePathFilter (apache#2326) [HUDI-1394] [RFC-15] Use metadata table (if present) to get all partition paths (apache#2351) --- .../apache/hudi/cli/commands/MetadataCommand.java | 2 +- .../org/apache/hudi/config/HoodieWriteConfig.java | 1 + .../metadata/HoodieBackedTableMetadataWriter.java | 9 +-- .../java/org/apache/hudi/table/HoodieTable.java | 2 +- .../PartitionAwareClusteringPlanStrategy.java | 4 +- .../hudi/table/action/rollback/RollbackUtils.java | 9 +-- .../action/savepoint/SavepointActionExecutor.java | 22 ++++--- .../FlinkCopyOnWriteRollbackActionExecutor.java | 4 +- .../table/upgrade/ZeroToOneUpgradeHandler.java | 2 +- .../index/bloom/SparkHoodieGlobalBloomIndex.java | 2 +- .../index/simple/SparkHoodieGlobalSimpleIndex.java | 3 +- ...rkInsertOverwriteTableCommitActionExecutor.java | 3 +- .../HoodieSparkMergeOnReadTableCompactor.java | 2 +- .../SparkCopyOnWriteRollbackActionExecutor.java | 4 +- .../table/upgrade/ZeroToOneUpgradeHandler.java | 2 +- .../org/apache/hudi/client/TestClientRollback.java | 4 +- .../apache/hudi/metadata/TestHoodieFsMetadata.java | 4 +- .../hudi/common}/config/HoodieMetadataConfig.java | 11 ++-- .../java/org/apache/hudi/common/fs/FSUtils.java | 9 ++- .../common/table/view/FileSystemViewManager.java | 17 ++++++ .../metadata/FileSystemBackedTableMetadata.java | 69 ++++++++++++++++++++++ .../hudi/metadata/HoodieBackedTableMetadata.java | 24 ++++---- .../metadata/HoodieMetadataFileSystemView.java | 21 +++++-- .../hudi/hadoop/HoodieROTablePathFilter.java | 17 +++++- .../reader/DFSHoodieDatasetInputReader.java | 4 +- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 6 +- .../scala/org/apache/hudi/HoodieWriterUtils.scala | 7 +++ .../java/org/apache/hudi/client/TestBootstrap.java | 19 ++++-- .../apache/hudi/functional/TestCOWDataSource.scala | 15 ++++- .../java/org/apache/hudi/dla/DLASyncConfig.java | 12 ++++ .../java/org/apache/hudi/dla/HoodieDLAClient.java | 3 +- .../java/org/apache/hudi/hive/HiveSyncConfig.java | 48 +++++++++------ .../org/apache/hudi/hive/HoodieHiveClient.java | 2 +- .../hudi/sync/common/AbstractSyncHoodieClient.java | 11 +++- .../hudi/utilities/HoodieSnapshotCopier.java | 16 ++++- .../hudi/utilities/HoodieSnapshotExporter.java | 2 +- .../hudi/utilities/perf/TimelineServerPerf.java | 10 +++- .../functional/TestHoodieSnapshotCopier.java | 8 ++- 38 files changed, 308 insertions(+), 102 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java index 68ff1d1..f8a8eed 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java @@ -23,9 +23,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.config.HoodieMetadataConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadata; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index ae56454..138f1be 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -21,6 +21,7 @@ package org.apache.hudi.config; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.bootstrap.BootstrapMode; import org.apache.hudi.common.config.DefaultHoodieConfig; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.client.common.EngineType; import org.apache.hudi.common.model.HoodieCleaningPolicy; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 98d314d..9282e3b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; @@ -46,7 +47,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieMetadataConfig; import org.apache.hudi.config.HoodieMetricsConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -275,14 +275,15 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta // List all partitions in the basePath of the containing dataset FileSystem fs = datasetMetaClient.getFs(); - List<String> partitions = FSUtils.getAllPartitionPaths(fs, datasetWriteConfig.getBasePath(), datasetWriteConfig.shouldAssumeDatePartitioning()); + FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetWriteConfig.getBasePath(), + datasetWriteConfig.shouldAssumeDatePartitioning()); + List<String> partitions = fileSystemBackedTableMetadata.getAllPartitionPaths(); LOG.info("Initializing metadata table by using file listings in " + partitions.size() + " partitions"); // List all partitions in parallel and collect the files in them int parallelism = Math.max(partitions.size(), 1); List<Pair<String, FileStatus[]>> partitionFileList = engineContext.map(partitions, partition -> { - FileSystem fsys = datasetMetaClient.getFs(); - FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new Path(datasetWriteConfig.getBasePath(), partition)); + FileStatus[] statuses = fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(datasetWriteConfig.getBasePath(), partition)); return Pair.of(partition, statuses); }, parallelism); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 684df39..b268512 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -277,7 +277,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem private SyncableFileSystemView getFileSystemViewInternal(HoodieTimeline timeline) { if (config.useFileListingMetadata()) { FileSystemViewStorageConfig viewConfig = config.getViewStorageConfig(); - return new HoodieMetadataFileSystemView(metaClient, this, timeline, viewConfig.isIncrementalTimelineSyncEnabled()); + return new HoodieMetadataFileSystemView(metaClient, this.metadata, timeline, viewConfig.isIncrementalTimelineSyncEnabled()); } else { return getViewManager().getFileSystemView(metaClient); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java index 404cc02..5a42cdc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java @@ -66,8 +66,10 @@ public abstract class PartitionAwareClusteringPlanStrategy<T extends HoodieRecor try { HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient(); LOG.info("Scheduling clustering for " + metaClient.getBasePath()); + HoodieWriteConfig config = getWriteConfig(); List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), - getWriteConfig().shouldAssumeDatePartitioning()); + config.useFileListingMetadata(), config.getFileListingMetadataVerify(), + config.shouldAssumeDatePartitioning()); // filter the partition paths if needed to reduce list status partitionPaths = filterPartitionPaths(partitionPaths); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index 897b448..bb59a66 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -88,12 +88,13 @@ public class RollbackUtils { * Generate all rollback requests that needs rolling back this action without actually performing rollback for COW table type. * @param fs instance of {@link FileSystem} to use. * @param basePath base path of interest. - * @param shouldAssumeDatePartitioning {@code true} if date partitioning should be assumed. {@code false} otherwise. + * @param config instance of {@link HoodieWriteConfig} to use. * @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected. */ - public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(FileSystem fs, String basePath, boolean shouldAssumeDatePartitioning) { + public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(FileSystem fs, String basePath, HoodieWriteConfig config) { try { - return FSUtils.getAllPartitionPaths(fs, basePath, shouldAssumeDatePartitioning).stream() + return FSUtils.getAllPartitionPaths(fs, basePath, config.useFileListingMetadata(), + config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()).stream() .map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction) .collect(Collectors.toList()); } catch (IOException e) { @@ -113,7 +114,7 @@ public class RollbackUtils { String commit = instantToRollback.getTimestamp(); HoodieWriteConfig config = table.getConfig(); List<String> partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), - config.shouldAssumeDatePartitioning()); + config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()); int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1); context.setJobStatus(RollbackUtils.class.getSimpleName(), "Generate all rollback requests"); return context.flatMap(partitions, partitionPath -> { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java index 90a96b9..c1d2c4a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java @@ -36,6 +36,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.BaseActionExecutor; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -89,15 +90,18 @@ public class SavepointActionExecutor<T extends HoodieRecordPayload, I, K, O> ext "Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained); context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime); - Map<String, List<String>> latestFilesMap = context.mapToPair(FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), - table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()), partitionPath -> { - // Scan all partitions files with this commit time - LOG.info("Collecting latest files in partition path " + partitionPath); - TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView(); - List<String> latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime) - .map(HoodieBaseFile::getFileName).collect(Collectors.toList()); - return new ImmutablePair<>(partitionPath, latestFiles); - }, null); + List<String> partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), + table.getMetaClient().getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(), + config.shouldAssumeDatePartitioning() + ); + Map<String, List<String>> latestFilesMap = context.mapToPair(partitions, partitionPath -> { + // Scan all partitions files with this commit time + LOG.info("Collecting latest files in partition path " + partitionPath); + TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView(); + List<String> latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime) + .map(HoodieBaseFile::getFileName).collect(Collectors.toList()); + return new ImmutablePair<>(partitionPath, latestFiles); + }, null); HoodieSavepointMetadata metadata = TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap); // Nothing to save in the savepoint table.getActiveTimeline().createNewInstant( diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java index 28b713b..6221dd5 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java @@ -64,8 +64,8 @@ public class FlinkCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayloa @Override protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) { - List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), - config.shouldAssumeDatePartitioning()); + List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW( + table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config); return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index 1fa3ad0..17453bb 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -94,7 +94,7 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler { List<ListingBasedRollbackRequest> rollbackRequests; if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) { rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), - table.getConfig().shouldAssumeDatePartitioning()); + table.getConfig()); } else { rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java index 771c01a..310dbd2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java @@ -64,7 +64,7 @@ public class SparkHoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); try { List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), - config.shouldAssumeDatePartitioning()); + config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()); return super.loadInvolvedFiles(allPartitionPaths, context, hoodieTable); } catch (IOException e) { throw new HoodieIOException("Failed to load all partitions", e); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java index bdb4991..092c62b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java @@ -104,7 +104,8 @@ public class SparkHoodieGlobalSimpleIndex<T extends HoodieRecordPayload> extends final HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) { HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); try { - List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning()); + List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), + config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()); // Obtain the latest data files from all the partitions. return getLatestBaseFilesForAllPartitions(allPartitionPaths, context, hoodieTable); } catch (IOException e) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java index c014515..6cc474f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java @@ -51,7 +51,8 @@ public class SparkInsertOverwriteTableCommitActionExecutor<T extends HoodieRecor Map<String, List<String>> partitionToExistingFileIds = new HashMap<>(); try { List<String> partitionPaths = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), - table.getMetaClient().getBasePath(), false); + table.getMetaClient().getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(), + false); JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); if (partitionPaths != null && partitionPaths.size() > 0) { context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions"); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java index 96d52a1..21ffd46 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java @@ -196,7 +196,7 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload> HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime); List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), - config.shouldAssumeDatePartitioning()); + config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()); // filter the partition paths if needed to reduce list status partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java index 965d805..b770bbf 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java @@ -66,8 +66,8 @@ public class SparkCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayloa @Override protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) { - List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), - config.shouldAssumeDatePartitioning()); + List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), + table.getMetaClient().getBasePath(), config); return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index 7e3faf3..1a1cb3f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -93,7 +93,7 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler { List<ListingBasedRollbackRequest> rollbackRequests; if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) { rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), - table.getConfig().shouldAssumeDatePartitioning()); + table.getConfig()); } else { rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index d04a2df..eab9bb1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -99,8 +99,10 @@ public class TestClientRollback extends HoodieClientTestBase { statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); // Verify there are no errors assertNoWriteErrors(statuses); + HoodieWriteConfig config = getConfig(); List<String> partitionPaths = - FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning()); + FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), config.useFileListingMetadata(), + config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context, metaClient); final BaseFileOnlyView view1 = table.getBaseFileOnlyView(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java index e5b1b9f..b9c3511 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.FileSlice; @@ -60,7 +61,6 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; -import org.apache.hudi.config.HoodieMetadataConfig; import org.apache.hudi.config.HoodieMetricsConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -736,7 +736,7 @@ public class TestHoodieFsMetadata extends HoodieClientTestHarness { // Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that function filters all directory // in the .hoodie folder. List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(fs, HoodieTableMetadata.getMetadataTableBasePath(basePath), - false); + false, false, false); assertEquals(MetadataPartitionType.values().length, metadataTablePartitions.size()); // Metadata table should automatically compact and clean diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java similarity index 94% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java rename to hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 54c4ac3..02e67e1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -16,12 +16,9 @@ * limitations under the License. */ -package org.apache.hudi.config; - -import org.apache.hudi.common.config.DefaultHoodieConfig; +package org.apache.hudi.common.config; import javax.annotation.concurrent.Immutable; - import java.io.File; import java.io.FileReader; import java.io.IOException; @@ -31,7 +28,7 @@ import java.util.Properties; * Configurations used by the HUDI Metadata Table. */ @Immutable -public class HoodieMetadataConfig extends DefaultHoodieConfig { +public final class HoodieMetadataConfig extends DefaultHoodieConfig { public static final String METADATA_PREFIX = "hoodie.metadata"; @@ -65,6 +62,10 @@ public class HoodieMetadataConfig extends DefaultHoodieConfig { public static final String CLEANER_COMMITS_RETAINED_PROP = METADATA_PREFIX + ".cleaner.commits.retained"; public static final int DEFAULT_CLEANER_COMMITS_RETAINED = 3; + // We can set the default to true for readers, as it will internally default to listing from filesystem if metadata + // table is not found + public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = true; + private HoodieMetadataConfig(Properties props) { super(props); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 94d05b3..d671ec8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.InvalidHoodiePathException; +import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -251,12 +252,14 @@ public class FSUtils { } } - public static List<String> getAllPartitionPaths(FileSystem fs, String basePathStr, boolean assumeDatePartitioning) - throws IOException { + public static List<String> getAllPartitionPaths(FileSystem fs, String basePathStr, boolean useFileListingFromMetadata, boolean verifyListings, + boolean assumeDatePartitioning) throws IOException { if (assumeDatePartitioning) { return getAllPartitionFoldersThreeLevelsDown(fs, basePathStr); } else { - return getAllFoldersWithPartitionMetaFile(fs, basePathStr); + HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(fs.getConf(), basePathStr, "/tmp/", useFileListingFromMetadata, + verifyListings, false, false); + return tableMetadata.getAllPartitionPaths(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index d310181..c5a31fa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Functions.Function2; +import org.apache.hudi.metadata.HoodieMetadataFileSystemView; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -158,6 +159,22 @@ public class FileSystemViewManager { return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled()); } + public static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieTableMetaClient metaClient, + boolean useFileListingFromMetadata, + boolean verifyListings) { + LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePath()); + if (useFileListingFromMetadata) { + return new HoodieMetadataFileSystemView(metaClient, + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), + true, + verifyListings); + } + + return new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()); + } + + /** * Create a remote file System view for a table. * diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java new file mode 100644 index 0000000..73ce8e4 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.List; + +public class FileSystemBackedTableMetadata implements HoodieTableMetadata { + + private final SerializableConfiguration hadoopConf; + private final String datasetBasePath; + private final boolean assumeDatePartitioning; + + public FileSystemBackedTableMetadata(SerializableConfiguration conf, String datasetBasePath, boolean assumeDatePartitioning) { + this.hadoopConf = conf; + this.datasetBasePath = datasetBasePath; + this.assumeDatePartitioning = assumeDatePartitioning; + } + + @Override + public FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOException { + FileSystem fs = partitionPath.getFileSystem(hadoopConf.get()); + return FSUtils.getAllDataFilesInPartition(fs, partitionPath); + } + + @Override + public List<String> getAllPartitionPaths() throws IOException { + FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get()); + if (assumeDatePartitioning) { + return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath); + } else { + return FSUtils.getAllFoldersWithPartitionMetaFile(fs, datasetBasePath); + } + } + + @Override + public Option<String> getSyncedInstantTime() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isInSync() { + throw new UnsupportedOperationException(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index cdff41c..4858e6e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -30,7 +30,6 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieMetadataRecord; @@ -85,7 +84,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { private final String spillableMapDirectory; // Readers for the base and log file which store the metadata - private transient HoodieFileReader<GenericRecord> basefileReader; + private transient HoodieFileReader<GenericRecord> baseFileReader; private transient HoodieMetadataMergedLogRecordScanner logRecordScanner; public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory, @@ -108,7 +107,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { try { this.metaClient = new HoodieTableMetaClient(hadoopConf.get(), metadataBasePath); } catch (TableNotFoundException e) { - LOG.error("Metadata table was not found at path " + metadataBasePath); + LOG.warn("Metadata table was not found at path " + metadataBasePath); this.enabled = false; } catch (Exception e) { LOG.error("Failed to initialize metadata table at path " + metadataBasePath, e); @@ -144,9 +143,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { LOG.error("Failed to retrieve list of partition from metadata", e); } } - - FileSystem fs = FSUtils.getFs(datasetBasePath, hadoopConf.get()); - return FSUtils.getAllPartitionPaths(fs, datasetBasePath, assumeDatePartitioning); + return new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, assumeDatePartitioning).getAllPartitionPaths(); } /** @@ -199,7 +196,8 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { if (validateLookups) { // Validate the Metadata Table data by listing the partitions from the file system timer.startTimer(); - List<String> actualPartitions = FSUtils.getAllPartitionPaths(metaClient.getFs(), datasetBasePath, false); + FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, assumeDatePartitioning); + List<String> actualPartitions = fileSystemBackedTableMetadata.getAllPartitionPaths(); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR, timer.endTimer())); Collections.sort(actualPartitions); @@ -287,9 +285,9 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { // Retrieve record from base file HoodieRecord<HoodieMetadataPayload> hoodieRecord = null; - if (basefileReader != null) { + if (baseFileReader != null) { HoodieTimer timer = new HoodieTimer().startTimer(); - Option<GenericRecord> baseRecord = basefileReader.getRecordByKey(key); + Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key); if (baseRecord.isPresent()) { hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), metaClient.getTableConfig().getPayloadClass()); @@ -338,7 +336,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { Option<HoodieBaseFile> basefile = latestSlices.get(0).getBaseFile(); if (basefile.isPresent()) { String basefilePath = basefile.get().getPath(); - basefileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath)); + baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath)); LOG.info("Opened metadata base file from " + basefilePath + " at instant " + basefile.get().getCommitTime()); } @@ -365,9 +363,9 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { } public void closeReaders() { - if (basefileReader != null) { - basefileReader.close(); - basefileReader = null; + if (baseFileReader != null) { + baseFileReader.close(); + baseFileReader = null; } logRecordScanner = null; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java similarity index 68% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java rename to hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java index 8c23ea8..9f9e405 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java @@ -24,19 +24,30 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import org.apache.hudi.table.HoodieTable; /** * {@code HoodieTableFileSystemView} implementation that retrieved partition listings from the Metadata Table. */ public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView { - private HoodieTable hoodieTable; - public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, HoodieTable table, + private final HoodieTableMetadata tableMetadata; + + public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, HoodieTableMetadata tableMetadata, HoodieTimeline visibleActiveTimeline, boolean enableIncrementalTimelineSync) { super(metaClient, visibleActiveTimeline, enableIncrementalTimelineSync); - this.hoodieTable = table; + this.tableMetadata = tableMetadata; + } + + public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, + HoodieTimeline visibleActiveTimeline, + boolean useFileListingFromMetadata, + boolean verifyListings) { + super(metaClient, visibleActiveTimeline); + this.tableMetadata = HoodieTableMetadata.create(metaClient.getHadoopConf(), metaClient.getBasePath(), + FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR, useFileListingFromMetadata, verifyListings, + false, false); } /** @@ -47,6 +58,6 @@ public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView { */ @Override protected FileStatus[] listPartition(Path partitionPath) throws IOException { - return hoodieTable.metadata().getAllFilesInPartition(partitionPath); + return tableMetadata.getAllFilesInPartition(partitionPath); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java index 1e616f8..baedb16 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java @@ -22,9 +22,11 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configurable; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.TableNotFoundException; @@ -43,6 +45,11 @@ import java.util.HashSet; import java.util.List; import java.util.stream.Collectors; +import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS; +import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE; +import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP; +import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP; + /** * Given a path is a part of - Hoodie table = accepts ONLY the latest version of each path - Non-Hoodie table = then * always accept @@ -163,9 +170,13 @@ public class HoodieROTablePathFilter implements Configurable, PathFilter, Serial metaClientCache.put(baseDir.toString(), metaClient); } - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), fs.listStatus(folder)); - List<HoodieBaseFile> latestFiles = fsView.getLatestBaseFiles().collect(Collectors.toList()); + boolean useFileListingFromMetadata = getConf().getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS); + boolean verifyFileListing = getConf().getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE); + HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(metaClient, + useFileListingFromMetadata, verifyFileListing); + String partition = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), folder); + + List<HoodieBaseFile> latestFiles = fsView.getLatestBaseFiles(partition).collect(Collectors.toList()); // populate the cache if (!hoodiePathCache.containsKey(folder.toString())) { hoodiePathCache.put(folder.toString(), new HashSet<>()); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index 43d5fde..a41da2c 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -19,6 +19,7 @@ package org.apache.hudi.integ.testsuite.reader; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieRecord; @@ -85,7 +86,8 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { // Using FSUtils.getFS here instead of metaClient.getFS() since we dont want to count these listStatus // calls in metrics as they are not part of normal HUDI operation. FileSystem fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf()); - List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, metaClient.getBasePath(), false); + List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, metaClient.getBasePath(), + HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false); // Sort partition so we can pick last N partitions by default Collections.sort(partitionPaths); if (!partitionPaths.isEmpty()) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 988c9f9..472f450 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -29,7 +29,7 @@ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.client.HoodieWriteResult import org.apache.hudi.client.SparkRDDWriteClient -import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline @@ -38,7 +38,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, B import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} -import org.apache.hudi.internal.{HoodieDataSourceInternalWriter, DataSourceInternalWriterHelper} +import org.apache.hudi.internal.{DataSourceInternalWriterHelper, HoodieDataSourceInternalWriter} import org.apache.hudi.sync.common.AbstractSyncTool import org.apache.log4j.LogManager import org.apache.spark.SPARK_VERSION @@ -372,6 +372,8 @@ private[hudi] object HoodieSparkSqlWriter { ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList: _*) hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY) hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean + hiveSyncConfig.useFileListingFromMetadata = parameters(HoodieMetadataConfig.METADATA_ENABLE_PROP).toBoolean + hiveSyncConfig.verifyMetadataFileListing = parameters(HoodieMetadataConfig.METADATA_VALIDATE_PROP).toBoolean hiveSyncConfig.supportTimestamp = parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean) hiveSyncConfig.decodePartition = parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY, DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL).toBoolean diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 294050b..02b5abd 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -23,6 +23,11 @@ import org.apache.hudi.common.config.TypedProperties import scala.collection.JavaConversions.mapAsJavaMap import scala.collection.JavaConverters.mapAsScalaMapConverter +import org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE +import org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE +import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP +import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP + /** * WriterUtils to assist in write path in Datasource and tests. */ @@ -46,6 +51,8 @@ object HoodieWriterUtils { RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL, PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL, KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL, + METADATA_ENABLE_PROP -> DEFAULT_METADATA_ENABLE.toString, + METADATA_VALIDATE_PROP -> DEFAULT_METADATA_VALIDATE.toString, COMMIT_METADATA_KEYPREFIX_OPT_KEY -> DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL, INSERT_DROP_DUPS_OPT_KEY -> DEFAULT_INSERT_DROP_DUPS_OPT_VAL, STREAMING_RETRY_CNT_OPT_KEY -> DEFAULT_STREAMING_RETRY_CNT_OPT_VAL, diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java index 7e13a5e..521ff05 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java @@ -28,6 +28,7 @@ import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelect import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.bootstrap.index.BootstrapIndex; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieKey; @@ -372,7 +373,8 @@ public class TestBootstrap extends HoodieClientTestBase { reloadInputFormats(); List<GenericRecord> records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat( jsc.hadoopConfiguration(), - FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream() + FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, + HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>()); assertEquals(totalRecords, records.size()); @@ -390,7 +392,8 @@ public class TestBootstrap extends HoodieClientTestBase { seenKeys = new HashSet<>(); records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat( jsc.hadoopConfiguration(), - FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream() + FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, + HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>()); assertEquals(totalRecords, records.size()); @@ -406,7 +409,8 @@ public class TestBootstrap extends HoodieClientTestBase { reloadInputFormats(); records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat( jsc.hadoopConfiguration(), - FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream() + FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, + HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, true, HoodieRecord.HOODIE_META_COLUMNS); @@ -423,7 +427,8 @@ public class TestBootstrap extends HoodieClientTestBase { seenKeys = new HashSet<>(); records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat( jsc.hadoopConfiguration(), - FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream() + FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, + HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, true, HoodieRecord.HOODIE_META_COLUMNS); @@ -438,7 +443,8 @@ public class TestBootstrap extends HoodieClientTestBase { reloadInputFormats(); records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat( jsc.hadoopConfiguration(), - FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream() + FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, + HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, true, Arrays.asList("_row_key")); @@ -455,7 +461,8 @@ public class TestBootstrap extends HoodieClientTestBase { seenKeys = new HashSet<>(); records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat( jsc.hadoopConfiguration(), - FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream() + FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, + HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, true, Arrays.asList("_row_key")); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 51ca72e..c3843cc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -22,6 +22,7 @@ import java.util.function.Supplier import java.util.stream.Stream import org.apache.hadoop.fs.Path +import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.testutils.HoodieTestDataGenerator @@ -34,6 +35,8 @@ import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.types.{DataTypes, DateType, IntegerType, StringType, StructField, StructType, TimestampType} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.collection.JavaConversions._ @@ -82,13 +85,16 @@ class TestCOWDataSource extends HoodieClientTestBase { assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) } - @Test def testCopyOnWriteStorage() { + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testCopyOnWriteStorage(isMetadataEnabled: Boolean) { // Insert Operation val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("org.apache.hudi") .options(commonOpts) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled) .mode(SaveMode.Overwrite) .save(basePath) @@ -96,7 +102,9 @@ class TestCOWDataSource extends HoodieClientTestBase { val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath) // Snapshot query - val snapshotDF1 = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*") + val snapshotDF1 = spark.read.format("org.apache.hudi") + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled) + .load(basePath + "/*/*/*/*") assertEquals(100, snapshotDF1.count()) // Upsert based on the written table with Hudi metadata columns @@ -120,6 +128,7 @@ class TestCOWDataSource extends HoodieClientTestBase { inputDF2.write.format("org.apache.hudi") .options(commonOpts) + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled) .mode(SaveMode.Append) .save(basePath) @@ -128,6 +137,7 @@ class TestCOWDataSource extends HoodieClientTestBase { // Snapshot Query val snapshotDF3 = spark.read.format("org.apache.hudi") + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled) .load(basePath + "/*/*/*/*") assertEquals(100, snapshotDF3.count()) // still 100, since we only updated @@ -149,6 +159,7 @@ class TestCOWDataSource extends HoodieClientTestBase { val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1)) emptyDF.write.format("org.apache.hudi") .options(commonOpts) + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled) .mode(SaveMode.Append) .save(basePath) diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java index 8a92de6..5b50ada 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java @@ -19,6 +19,8 @@ package org.apache.hudi.dla; import com.beust.jcommander.Parameter; + +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import java.io.Serializable; @@ -68,6 +70,12 @@ public class DLASyncConfig implements Serializable { @Parameter(names = {"--hive-style-partitioning"}, description = "Use DLA hive style partitioning, true if like the following style: field1=value1/field2=value2") public Boolean useDLASyncHiveStylePartitioning = false; + @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata") + public Boolean useFileListingFromMetadata = HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS; + + @Parameter(names = {"--verify-metadata-file-listing"}, description = "Verify file listing from Hudi's metadata against file system") + public Boolean verifyMetadataFileListing = HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE; + @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; @@ -88,6 +96,8 @@ public class DLASyncConfig implements Serializable { newConfig.skipROSuffix = cfg.skipROSuffix; newConfig.skipRTSync = cfg.skipRTSync; newConfig.useDLASyncHiveStylePartitioning = cfg.useDLASyncHiveStylePartitioning; + newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata; + newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing; newConfig.supportTimestamp = cfg.supportTimestamp; return newConfig; } @@ -99,6 +109,8 @@ public class DLASyncConfig implements Serializable { + ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning + ", useDLASyncHiveStylePartitioning=" + useDLASyncHiveStylePartitioning + + ", useFileListingFromMetadata=" + useFileListingFromMetadata + + ", verifyMetadataFileListing=" + verifyMetadataFileListing + ", help=" + help + '}'; } } diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java index 34a96c9..02c07d6 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java @@ -70,7 +70,8 @@ public class HoodieDLAClient extends AbstractSyncHoodieClient { private PartitionValueExtractor partitionValueExtractor; public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) { - super(syncConfig.basePath, syncConfig.assumeDatePartitioning, fs); + super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata, + syncConfig.verifyMetadataFileListing, fs); this.dlaConfig = syncConfig; try { this.partitionValueExtractor = diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 6c8fd8f..dd9d483 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -18,6 +18,8 @@ package org.apache.hudi.hive; +import org.apache.hudi.common.config.HoodieMetadataConfig; + import com.beust.jcommander.Parameter; import java.io.Serializable; @@ -77,6 +79,12 @@ public class HiveSyncConfig implements Serializable { @Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering") public Boolean skipROSuffix = false; + @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata") + public Boolean useFileListingFromMetadata = HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS; + + @Parameter(names = {"--verify-metadata-file-listing"}, description = "Verify file listing from Hudi's metadata against file system") + public Boolean verifyMetadataFileListing = HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE; + @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; @@ -99,6 +107,8 @@ public class HiveSyncConfig implements Serializable { newConfig.jdbcUrl = cfg.jdbcUrl; newConfig.tableName = cfg.tableName; newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat; + newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata; + newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing; newConfig.supportTimestamp = cfg.supportTimestamp; newConfig.decodePartition = cfg.decodePartition; return newConfig; @@ -107,23 +117,25 @@ public class HiveSyncConfig implements Serializable { @Override public String toString() { return "HiveSyncConfig{" - + "databaseName='" + databaseName + '\'' - + ", tableName='" + tableName + '\'' - + ", baseFileFormat='" + baseFileFormat + '\'' - + ", hiveUser='" + hiveUser + '\'' - + ", hivePass='" + hivePass + '\'' - + ", jdbcUrl='" + jdbcUrl + '\'' - + ", basePath='" + basePath + '\'' - + ", partitionFields=" + partitionFields - + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\'' - + ", assumeDatePartitioning=" + assumeDatePartitioning - + ", usePreApacheInputFormat=" + usePreApacheInputFormat - + ", useJdbc=" + useJdbc - + ", autoCreateDatabase=" + autoCreateDatabase - + ", skipROSuffix=" + skipROSuffix - + ", help=" + help - + ", supportTimestamp=" + supportTimestamp - + ", decodePartition=" + decodePartition - + '}'; + + "databaseName='" + databaseName + '\'' + + ", tableName='" + tableName + '\'' + + ", baseFileFormat='" + baseFileFormat + '\'' + + ", hiveUser='" + hiveUser + '\'' + + ", hivePass='" + hivePass + '\'' + + ", jdbcUrl='" + jdbcUrl + '\'' + + ", basePath='" + basePath + '\'' + + ", partitionFields=" + partitionFields + + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\'' + + ", assumeDatePartitioning=" + assumeDatePartitioning + + ", usePreApacheInputFormat=" + usePreApacheInputFormat + + ", useJdbc=" + useJdbc + + ", autoCreateDatabase=" + autoCreateDatabase + + ", skipROSuffix=" + skipROSuffix + + ", help=" + help + + ", supportTimestamp=" + supportTimestamp + + ", decodePartition=" + decodePartition + + ", useFileListingFromMetadata=" + useFileListingFromMetadata + + ", verifyMetadataFileListing=" + verifyMetadataFileListing + + '}'; } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 88f4c10..5c0c128 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -76,7 +76,7 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { private HiveConf configuration; public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) { - super(cfg.basePath, cfg.assumeDatePartitioning, fs); + super(cfg.basePath, cfg.assumeDatePartitioning, cfg.useFileListingFromMetadata, cfg.verifyMetadataFileListing, fs); this.syncConfig = cfg; this.fs = fs; diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index 419ea16..8c91848 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -40,18 +40,25 @@ import java.util.List; import java.util.Map; public abstract class AbstractSyncHoodieClient { + private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class); + protected final HoodieTableMetaClient metaClient; protected final HoodieTableType tableType; protected final FileSystem fs; private String basePath; private boolean assumeDatePartitioning; + private boolean useFileListingFromMetadata; + private boolean verifyMetadataFileListing; - public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, FileSystem fs) { + public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata, + boolean verifyMetadataFileListing, FileSystem fs) { this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); this.tableType = metaClient.getTableType(); this.basePath = basePath; this.assumeDatePartitioning = assumeDatePartitioning; + this.useFileListingFromMetadata = useFileListingFromMetadata; + this.verifyMetadataFileListing = verifyMetadataFileListing; this.fs = fs; } @@ -120,7 +127,7 @@ public abstract class AbstractSyncHoodieClient { if (!lastCommitTimeSynced.isPresent()) { LOG.info("Last commit time synced is not known, listing all partitions in " + basePath + ",FS :" + fs); try { - return FSUtils.getAllPartitionPaths(fs, basePath, assumeDatePartitioning); + return FSUtils.getAllPartitionPaths(fs, basePath, useFileListingFromMetadata, verifyMetadataFileListing, assumeDatePartitioning); } catch (IOException e) { throw new HoodieIOException("Failed to list all partitions in " + basePath, e); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java index 2826108..0066d86 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities; import org.apache.hudi.client.common.HoodieEngineContext; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; @@ -68,10 +69,18 @@ public class HoodieSnapshotCopier implements Serializable { @Parameter(names = {"--date-partitioned", "-dp"}, description = "Can we assume date partitioning?") boolean shouldAssumeDatePartitioning = false; + + @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata") + public Boolean useFileListingFromMetadata = HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS; + + @Parameter(names = {"--verify-metadata-file-listing"}, description = "Verify file listing from Hudi's metadata against file system") + public Boolean verifyMetadataFileListing = HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE; } public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDir, - final boolean shouldAssumeDatePartitioning) throws IOException { + final boolean shouldAssumeDatePartitioning, + final boolean useFileListingFromMetadata, + final boolean verifyMetadataFileListing) throws IOException { FileSystem fs = FSUtils.getFs(baseDir, jsc.hadoopConfiguration()); final SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration()); final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), baseDir); @@ -88,7 +97,7 @@ public class HoodieSnapshotCopier implements Serializable { LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp)); - List<String> partitions = FSUtils.getAllPartitionPaths(fs, baseDir, shouldAssumeDatePartitioning); + List<String> partitions = FSUtils.getAllPartitionPaths(fs, baseDir, useFileListingFromMetadata, verifyMetadataFileListing, shouldAssumeDatePartitioning); if (partitions.size() > 0) { LOG.info(String.format("The job needs to copy %d partitions.", partitions.size())); @@ -183,7 +192,8 @@ public class HoodieSnapshotCopier implements Serializable { // Copy HoodieSnapshotCopier copier = new HoodieSnapshotCopier(); - copier.snapshot(jsc, cfg.basePath, cfg.outputPath, cfg.shouldAssumeDatePartitioning); + copier.snapshot(jsc, cfg.basePath, cfg.outputPath, cfg.shouldAssumeDatePartitioning, cfg.useFileListingFromMetadata, + cfg.verifyMetadataFileListing); // Stop the job jsc.stop(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java index c69d004..2f5f461 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java @@ -154,7 +154,7 @@ public class HoodieSnapshotExporter { } private List<String> getPartitions(FileSystem fs, Config cfg) throws IOException { - return FSUtils.getAllPartitionPaths(fs, cfg.sourceBasePath, false); + return FSUtils.getAllPartitionPaths(fs, cfg.sourceBasePath, true, false, false); } private void createSuccessTag(FileSystem fs, Config cfg) throws IOException { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java index f338e52..27dc709 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.perf; import org.apache.hudi.client.common.HoodieEngineContext; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -85,7 +86,8 @@ public class TimelineServerPerf implements Serializable { public void run() throws IOException { - List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(timelineServer.getFs(), cfg.basePath, true); + List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(timelineServer.getFs(), cfg.basePath, cfg.useFileListingFromMetadata, + cfg.verifyMetadataFileListing, true); Collections.shuffle(allPartitionPaths); List<String> selected = allPartitionPaths.stream().filter(p -> !p.contains("error")).limit(cfg.maxPartitions) .collect(Collectors.toList()); @@ -294,6 +296,12 @@ public class TimelineServerPerf implements Serializable { @Parameter(names = {"--wait-for-manual-queries", "-ww"}) public Boolean waitForManualQueries = false; + @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata") + public Boolean useFileListingFromMetadata = HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS; + + @Parameter(names = {"--verify-metadata-file-listing"}, description = "Verify file listing from Hudi's metadata against file system") + public Boolean verifyMetadataFileListing = HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE; + @Parameter(names = {"--help", "-h"}) public Boolean help = false; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java index 95af888..0b19fa0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java @@ -18,6 +18,7 @@ package org.apache.hudi.utilities.functional; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; @@ -67,7 +68,9 @@ public class TestHoodieSnapshotCopier extends FunctionalTestHarness { // Do the snapshot HoodieSnapshotCopier copier = new HoodieSnapshotCopier(); - copier.snapshot(jsc(), basePath, outputPath, true); + copier.snapshot(jsc(), basePath, outputPath, true, + HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, + HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE); // Nothing changed; we just bail out assertEquals(fs.listStatus(new Path(basePath)).length, 1); @@ -120,7 +123,8 @@ public class TestHoodieSnapshotCopier extends FunctionalTestHarness { // Do a snapshot copy HoodieSnapshotCopier copier = new HoodieSnapshotCopier(); - copier.snapshot(jsc(), basePath, outputPath, false); + copier.snapshot(jsc(), basePath, outputPath, false, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, + HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE); // Check results assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName())));
