This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/rfc-15 by this push:
new c6c7e18 [HUDI-1292] [RFC-15] Use metadata table (if present) to get
all partition paths (#2351)
c6c7e18 is described below
commit c6c7e18701b4d5a9435451b537b973be05342511
Author: Udit Mehrotra <[email protected]>
AuthorDate: Sun Dec 20 11:29:16 2020 -0800
[HUDI-1292] [RFC-15] Use metadata table (if present) to get all partition
paths (#2351)
---
.../hudi/index/bloom/HoodieGlobalBloomIndex.java | 2 +-
.../hudi/index/simple/HoodieGlobalSimpleIndex.java | 3 +-
.../metadata/HoodieBackedTableMetadataWriter.java | 7 ++-
.../compact/HoodieMergeOnReadTableCompactor.java | 2 +-
.../CopyOnWriteRollbackActionExecutor.java | 2 +-
.../hudi/table/action/rollback/RollbackUtils.java | 10 ++--
.../action/savepoint/SavepointActionExecutor.java | 2 +-
.../table/upgrade/ZeroToOneUpgradeHandler.java | 2 +-
.../org/apache/hudi/client/TestClientRollback.java | 5 +-
.../apache/hudi/metadata/TestHoodieFsMetadata.java | 2 +-
.../java/org/apache/hudi/common/fs/FSUtils.java | 10 +++-
.../metadata/FileSystemBackedTableMetadata.java | 69 ++++++++++++++++++++++
.../hudi/metadata/HoodieBackedTableMetadata.java | 9 ++-
.../reader/DFSHoodieDatasetInputReader.java | 4 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 4 +-
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 4 +-
.../java/org/apache/hudi/client/TestBootstrap.java | 19 ++++--
.../java/org/apache/hudi/dla/DLASyncConfig.java | 12 ++++
.../java/org/apache/hudi/dla/HoodieDLAClient.java | 3 +-
.../java/org/apache/hudi/hive/HiveSyncConfig.java | 13 +++-
.../org/apache/hudi/hive/HoodieHiveClient.java | 2 +-
.../hudi/sync/common/AbstractSyncHoodieClient.java | 9 ++-
.../hudi/utilities/HoodieSnapshotCopier.java | 15 ++++-
.../hudi/utilities/HoodieSnapshotExporter.java | 2 +-
.../hudi/utilities/perf/TimelineServerPerf.java | 10 +++-
.../functional/TestHoodieSnapshotCopier.java | 7 ++-
26 files changed, 184 insertions(+), 45 deletions(-)
diff --git
a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
index 4f93b30..be389ed 100644
---
a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
+++
b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
@@ -64,7 +64,7 @@ public class HoodieGlobalBloomIndex<T extends
HoodieRecordPayload> extends Hoodi
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
try {
List<String> allPartitionPaths =
FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
- config.shouldAssumeDatePartitioning());
+ config.useFileListingMetadata(),
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
return super.loadInvolvedFiles(allPartitionPaths, jsc, hoodieTable);
} catch (IOException e) {
throw new HoodieIOException("Failed to load all partitions", e);
diff --git
a/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
index 990f02d..1f3e954 100644
---
a/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
+++
b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
@@ -100,7 +100,8 @@ public class HoodieGlobalSimpleIndex<T extends
HoodieRecordPayload> extends Hood
protected List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(final
JavaSparkContext jsc, final HoodieTable hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
try {
- List<String> allPartitionPaths =
FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
config.shouldAssumeDatePartitioning());
+ List<String> allPartitionPaths =
FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
+ config.useFileListingMetadata(),
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
// Obtain the latest data files from all the partitions.
return getLatestBaseFilesForAllPartitions(allPartitionPaths, jsc,
hoodieTable);
} catch (IOException e) {
diff --git
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 45e4806..b560428 100644
---
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -312,15 +312,16 @@ public class HoodieBackedTableMetadataWriter implements
HoodieTableMetadataWrite
// List all partitions in the basePath of the containing dataset
FileSystem fs = datasetMetaClient.getFs();
- List<String> partitions = FSUtils.getAllPartitionPaths(fs,
datasetWriteConfig.getBasePath(),
datasetWriteConfig.shouldAssumeDatePartitioning());
+ FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new
FileSystemBackedTableMetadata(hadoopConf, datasetWriteConfig.getBasePath(),
+ datasetWriteConfig.shouldAssumeDatePartitioning());
+ List<String> partitions =
fileSystemBackedTableMetadata.getAllPartitionPaths();
LOG.info("Initializing metadata table by using file listings in " +
partitions.size() + " partitions");
// List all partitions in parallel and collect the files in them
int parallelism = Math.min(partitions.size(), jsc.defaultParallelism()) +
1; // +1 to prevent 0 parallelism
JavaPairRDD<String, FileStatus[]> partitionFileListRDD =
jsc.parallelize(partitions, parallelism)
.mapToPair(partition -> {
- FileSystem fsys = datasetMetaClient.getFs();
- FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new
Path(datasetWriteConfig.getBasePath(), partition));
+ FileStatus[] statuses =
fileSystemBackedTableMetadata.getAllFilesInPartition(new
Path(datasetWriteConfig.getBasePath(), partition));
return new Tuple2<>(partition, statuses);
});
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
index c4343f8..4a3402a 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
@@ -180,7 +180,7 @@ public class HoodieMergeOnReadTableCompactor implements
HoodieCompactor {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
LOG.info("Compacting " + metaClient.getBasePath() + " with commit " +
compactionCommitTime);
List<String> partitionPaths =
FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
- config.shouldAssumeDatePartitioning());
+ config.useFileListingMetadata(),
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
// filter the partition paths if needed to reduce list status
partitionPaths =
config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
index b06dc05..6e1cc1c 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
@@ -91,7 +91,7 @@ public class CopyOnWriteRollbackActionExecutor extends
BaseRollbackActionExecuto
@Override
protected List<HoodieRollbackStat>
executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
List<ListingBasedRollbackRequest> rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(),
- config.shouldAssumeDatePartitioning());
+ config);
return new ListingBasedRollbackHelper(table.getMetaClient(),
config).performRollback(jsc, instantToRollback, rollbackRequests);
}
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
index 3bfd645..b3c91e4 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
@@ -88,12 +88,14 @@ public class RollbackUtils {
* Generate all rollback requests that needs rolling back this action
without actually performing rollback for COW table type.
* @param fs instance of {@link FileSystem} to use.
* @param basePath base path of interest.
- * @param shouldAssumeDatePartitioning {@code true} if date partitioning
should be assumed. {@code false} otherwise.
+ * @param config instance of {@link HoodieWriteConfig} to use.
* @return {@link List} of {@link ListingBasedRollbackRequest}s thus
collected.
*/
- public static List<ListingBasedRollbackRequest>
generateRollbackRequestsByListingCOW(FileSystem fs, String basePath, boolean
shouldAssumeDatePartitioning) {
+ public static List<ListingBasedRollbackRequest>
generateRollbackRequestsByListingCOW(FileSystem fs, String basePath,
+ HoodieWriteConfig config) {
try {
- return FSUtils.getAllPartitionPaths(fs, basePath,
shouldAssumeDatePartitioning).stream()
+ return FSUtils.getAllPartitionPaths(fs, basePath,
config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
+ config.shouldAssumeDatePartitioning()).stream()
.map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
.collect(Collectors.toList());
} catch (IOException e) {
@@ -113,7 +115,7 @@ public class RollbackUtils {
String commit = instantToRollback.getTimestamp();
HoodieWriteConfig config = table.getConfig();
List<String> partitions =
FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(),
- config.shouldAssumeDatePartitioning());
+ config.useFileListingMetadata(),
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
int sparkPartitions = Math.max(Math.min(partitions.size(),
config.getRollbackParallelism()), 1);
jsc.setJobGroup(RollbackUtils.class.getSimpleName(), "Generate all
rollback requests");
return jsc.parallelize(partitions, Math.min(partitions.size(),
sparkPartitions)).flatMap(partitionPath -> {
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index ac95118..07ca044 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -89,7 +89,7 @@ public class SavepointActionExecutor extends
BaseActionExecutor<HoodieSavepointM
jsc.setJobGroup(this.getClass().getSimpleName(), "Collecting latest
files for savepoint " + instantTime);
Map<String, List<String>> latestFilesMap =
jsc.parallelize(FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
- table.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning()))
+ table.getMetaClient().getBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
config.shouldAssumeDatePartitioning()))
.mapToPair(partitionPath -> {
// Scan all partitions files with this commit time
LOG.info("Collecting latest files in partition path " +
partitionPath);
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index e9c9e28..3ac85b3 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -89,7 +89,7 @@ public class ZeroToOneUpgradeHandler implements
UpgradeHandler {
List<ListingBasedRollbackRequest> rollbackRequests;
if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(),
- table.getConfig().shouldAssumeDatePartitioning());
+ table.getConfig());
} else {
rollbackRequests =
RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(),
table, jsc);
}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index da4c002..5431d0e 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -99,8 +99,9 @@ public class TestClientRollback extends HoodieClientTestBase {
statuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
- List<String> partitionPaths =
- FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(),
getConfig().shouldAssumeDatePartitioning());
+ HoodieWriteConfig config = getConfig();
+ List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs,
cfg.getBasePath(), config.useFileListingMetadata(),
+ config.getFileListingMetadataVerify(),
config.shouldAssumeDatePartitioning());
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.create(metaClient, getConfig(),
hadoopConf);
final BaseFileOnlyView view1 = table.getBaseFileOnlyView();
diff --git
a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
index edd8359..2823035 100644
---
a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
+++
b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
@@ -718,7 +718,7 @@ public class TestHoodieFsMetadata extends
HoodieClientTestHarness {
// Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that
function filters all directory
// in the .hoodie folder.
List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(fs,
HoodieTableMetadata.getMetadataTableBasePath(basePath),
- false);
+ false, false,false);
assertEquals(MetadataPartitionType.values().length,
metadataTablePartitions.size());
// Metadata table should automatically compact and clean
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index c2338cf..0a84d13 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -28,6 +28,8 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.InvalidHoodiePathException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -248,12 +250,14 @@ public class FSUtils {
}
}
- public static List<String> getAllPartitionPaths(FileSystem fs, String
basePathStr, boolean assumeDatePartitioning)
- throws IOException {
+ public static List<String> getAllPartitionPaths(FileSystem fs, String
basePathStr, boolean useFileListingFromMetadata, boolean verifyListings,
+ boolean
assumeDatePartitioning) throws IOException {
if (assumeDatePartitioning) {
return getAllPartitionFoldersThreeLevelsDown(fs, basePathStr);
} else {
- return getAllFoldersWithPartitionMetaFile(fs, basePathStr);
+ HoodieTableMetadata tableMetadata =
HoodieTableMetadata.create(fs.getConf(), basePathStr, "/tmp/",
useFileListingFromMetadata,
+ verifyListings, false, false);
+ return tableMetadata.getAllPartitionPaths();
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
new file mode 100644
index 0000000..73ce8e4
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
+
+ private final SerializableConfiguration hadoopConf;
+ private final String datasetBasePath;
+ private final boolean assumeDatePartitioning;
+
+ public FileSystemBackedTableMetadata(SerializableConfiguration conf, String
datasetBasePath, boolean assumeDatePartitioning) {
+ this.hadoopConf = conf;
+ this.datasetBasePath = datasetBasePath;
+ this.assumeDatePartitioning = assumeDatePartitioning;
+ }
+
+ @Override
+ public FileStatus[] getAllFilesInPartition(Path partitionPath) throws
IOException {
+ FileSystem fs = partitionPath.getFileSystem(hadoopConf.get());
+ return FSUtils.getAllDataFilesInPartition(fs, partitionPath);
+ }
+
+ @Override
+ public List<String> getAllPartitionPaths() throws IOException {
+ FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get());
+ if (assumeDatePartitioning) {
+ return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs,
datasetBasePath);
+ } else {
+ return FSUtils.getAllFoldersWithPartitionMetaFile(fs, datasetBasePath);
+ }
+ }
+
+ @Override
+ public Option<String> getSyncedInstantTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isInSync() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index ca7c27a..3a1c7bf 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -30,7 +30,6 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
@@ -108,7 +107,7 @@ public class HoodieBackedTableMetadata implements
HoodieTableMetadata {
try {
this.metaClient = new HoodieTableMetaClient(hadoopConf.get(),
metadataBasePath);
} catch (TableNotFoundException e) {
- LOG.error("Metadata table was not found at path " + metadataBasePath);
+ LOG.warn("Metadata table was not found at path " + metadataBasePath +
", hence not using it.");
this.enabled = false;
} catch (Exception e) {
LOG.error("Failed to initialize metadata table at path " +
metadataBasePath, e);
@@ -145,8 +144,7 @@ public class HoodieBackedTableMetadata implements
HoodieTableMetadata {
}
}
- FileSystem fs = FSUtils.getFs(datasetBasePath, hadoopConf.get());
- return FSUtils.getAllPartitionPaths(fs, datasetBasePath,
assumeDatePartitioning);
+ return new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath,
assumeDatePartitioning).getAllPartitionPaths();
}
/**
@@ -199,7 +197,8 @@ public class HoodieBackedTableMetadata implements
HoodieTableMetadata {
if (validateLookups) {
// Validate the Metadata Table data by listing the partitions from the
file system
timer.startTimer();
- List<String> actualPartitions =
FSUtils.getAllPartitionPaths(metaClient.getFs(), datasetBasePath, false);
+ FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new
FileSystemBackedTableMetadata(hadoopConf, datasetBasePath,
assumeDatePartitioning);
+ List<String> actualPartitions =
fileSystemBackedTableMetadata.getAllPartitionPaths();
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR,
timer.endTimer()));
Collections.sort(actualPartitions);
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 209aa46..1690368 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -38,6 +38,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord;
@@ -79,7 +80,8 @@ public class DFSHoodieDatasetInputReader extends
DFSDeltaInputReader {
protected List<String> getPartitions(Option<Integer> partitionsLimit) throws
IOException {
List<String> partitionPaths = FSUtils
- .getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
false);
+ .getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+ HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE,false);
// Sort partition so we can pick last N partitions by default
Collections.sort(partitionPaths);
if (!partitionPaths.isEmpty()) {
diff --git
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index df7960e..5360afe 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteClient, HoodieWriteResult}
-import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType,
WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
@@ -341,6 +341,8 @@ private[hudi] object HoodieSparkSqlWriter {
ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList:
_*)
hiveSyncConfig.partitionValueExtractorClass =
parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
+ hiveSyncConfig.useFileListingFromMetadata =
parameters(HoodieMetadataConfig.METADATA_ENABLE_PROP).toBoolean
+ hiveSyncConfig.verifyMetadataFileListing =
parameters(HoodieMetadataConfig.METADATA_VALIDATE_PROP).toBoolean
hiveSyncConfig
}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 294050b..d8540ce 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -18,7 +18,7 @@
package org.apache.hudi
import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters.mapAsScalaMapConverter
@@ -46,6 +46,8 @@ object HoodieWriterUtils {
RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL,
PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,
KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,
+ HoodieMetadataConfig.METADATA_ENABLE_PROP ->
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE.toString,
+ HoodieMetadataConfig.METADATA_VALIDATE_PROP ->
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE.toString,
COMMIT_METADATA_KEYPREFIX_OPT_KEY ->
DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL,
INSERT_DROP_DUPS_OPT_KEY -> DEFAULT_INSERT_DROP_DUPS_OPT_VAL,
STREAMING_RETRY_CNT_OPT_KEY -> DEFAULT_STREAMING_RETRY_CNT_OPT_VAL,
diff --git a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
index 14f36d4..2f45c0c 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
+++ b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
@@ -27,6 +27,7 @@ import
org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
import
org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
@@ -371,7 +372,8 @@ public class TestBootstrap extends HoodieClientTestBase {
reloadInputFormats();
List<GenericRecord> records =
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
jsc.hadoopConfiguration(),
- FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath,
false).stream()
+ FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath,
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+ HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE,false).stream()
.map(f -> basePath + "/" + f).collect(Collectors.toList()),
basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, false, new
ArrayList<>());
assertEquals(totalRecords, records.size());
@@ -389,7 +391,8 @@ public class TestBootstrap extends HoodieClientTestBase {
seenKeys = new HashSet<>();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
jsc.hadoopConfiguration(),
- FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath,
false).stream()
+ FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath,
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+ HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
.map(f -> basePath + "/" + f).collect(Collectors.toList()),
basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, false, new
ArrayList<>());
assertEquals(totalRecords, records.size());
@@ -405,7 +408,8 @@ public class TestBootstrap extends HoodieClientTestBase {
reloadInputFormats();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
jsc.hadoopConfiguration(),
- FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath,
false).stream()
+ FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath,
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+ HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE,false).stream()
.map(f -> basePath + "/" + f).collect(Collectors.toList()),
basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES,
true, HoodieRecord.HOODIE_META_COLUMNS);
@@ -422,7 +426,8 @@ public class TestBootstrap extends HoodieClientTestBase {
seenKeys = new HashSet<>();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
jsc.hadoopConfiguration(),
- FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath,
false).stream()
+ FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath,
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+ HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE,false).stream()
.map(f -> basePath + "/" + f).collect(Collectors.toList()),
basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, true,
HoodieRecord.HOODIE_META_COLUMNS);
@@ -437,7 +442,8 @@ public class TestBootstrap extends HoodieClientTestBase {
reloadInputFormats();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
jsc.hadoopConfiguration(),
- FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath,
false).stream()
+ FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath,
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+ HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE,false).stream()
.map(f -> basePath + "/" + f).collect(Collectors.toList()),
basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, true,
Arrays.asList("_row_key"));
@@ -454,7 +460,8 @@ public class TestBootstrap extends HoodieClientTestBase {
seenKeys = new HashSet<>();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
jsc.hadoopConfiguration(),
- FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath,
false).stream()
+ FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath,
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+ HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE,false).stream()
.map(f -> basePath + "/" + f).collect(Collectors.toList()),
basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, true,
Arrays.asList("_row_key"));
diff --git
a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java
b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java
index 988e9cd..4cf755d 100644
---
a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java
+++
b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java
@@ -19,6 +19,8 @@
package org.apache.hudi.dla;
import com.beust.jcommander.Parameter;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import java.io.Serializable;
@@ -65,6 +67,12 @@ public class DLASyncConfig implements Serializable {
@Parameter(names = {"--hive-style-partitioning"}, description = "Use DLA
hive style partitioning, true if like the following style:
field1=value1/field2=value2")
public Boolean useDLASyncHiveStylePartitioning = false;
+ @Parameter(names = {"--use-file-listing-from-metadata"}, description =
"Fetch file listing from Hudi's metadata")
+ public Boolean useFileListingFromMetadata =
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+
+ @Parameter(names = {"--verify-metadata-file-listing"}, description = "Verify
file listing from Hudi's metadata against file system")
+ public Boolean verifyMetadataFileListing =
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
+
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
@@ -81,6 +89,8 @@ public class DLASyncConfig implements Serializable {
newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
newConfig.skipROSuffix = cfg.skipROSuffix;
newConfig.useDLASyncHiveStylePartitioning =
cfg.useDLASyncHiveStylePartitioning;
+ newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
+ newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing;
return newConfig;
}
@@ -91,6 +101,8 @@ public class DLASyncConfig implements Serializable {
+ ", basePath='" + basePath + '\'' + ", partitionFields=" +
partitionFields + ", partitionValueExtractorClass='"
+ partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" +
assumeDatePartitioning
+ ", useDLASyncHiveStylePartitioning=" +
useDLASyncHiveStylePartitioning
+ + ", useFileListingFromMetadata=" + useFileListingFromMetadata
+ + ", verifyMetadataFileListing=" + verifyMetadataFileListing
+ ", help=" + help + '}';
}
}
diff --git
a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
index 34a96c9..02c07d6 100644
---
a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
+++
b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
@@ -70,7 +70,8 @@ public class HoodieDLAClient extends AbstractSyncHoodieClient
{
private PartitionValueExtractor partitionValueExtractor;
public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) {
- super(syncConfig.basePath, syncConfig.assumeDatePartitioning, fs);
+ super(syncConfig.basePath, syncConfig.assumeDatePartitioning,
syncConfig.useFileListingFromMetadata,
+ syncConfig.verifyMetadataFileListing, fs);
this.dlaConfig = syncConfig;
try {
this.partitionValueExtractor =
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index c861a53..a49bdef 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -18,6 +18,8 @@
package org.apache.hudi.hive;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+
import com.beust.jcommander.Parameter;
import java.io.Serializable;
@@ -77,6 +79,12 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro`
suffix for Read optimized table, when registering")
public Boolean skipROSuffix = false;
+ @Parameter(names = {"--use-file-listing-from-metadata"}, description =
"Fetch file listing from Hudi's metadata")
+ public Boolean useFileListingFromMetadata =
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+
+ @Parameter(names = {"--verify-metadata-file-listing"}, description = "Verify
file listing from Hudi's metadata against file system")
+ public Boolean verifyMetadataFileListing =
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
+
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
@@ -92,6 +100,8 @@ public class HiveSyncConfig implements Serializable {
newConfig.jdbcUrl = cfg.jdbcUrl;
newConfig.tableName = cfg.tableName;
newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
+ newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
+ newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing;
return newConfig;
}
@@ -101,6 +111,7 @@ public class HiveSyncConfig implements Serializable {
+ ", hiveUser='" + hiveUser + '\'' + ", hivePass='" + hivePass + '\''
+ ", jdbcUrl='" + jdbcUrl + '\''
+ ", basePath='" + basePath + '\'' + ", partitionFields=" +
partitionFields + ", partitionValueExtractorClass='"
+ partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" +
assumeDatePartitioning
- + ", usePreApacheInputFormat=" + usePreApacheInputFormat + ",
useJdbc=" + useJdbc + ", help=" + help + '}';
+ + ", usePreApacheInputFormat=" + usePreApacheInputFormat + ",
useJdbc=" + useJdbc + ", useFileListingFromMetadata="
+ + useFileListingFromMetadata + ", verifyMetadataFileListing=" +
verifyMetadataFileListing + ", help=" + help + '}';
}
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index eb8b867..bce8104 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -84,7 +84,7 @@ public class HoodieHiveClient extends
AbstractSyncHoodieClient {
private HiveConf configuration;
public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration,
FileSystem fs) {
- super(cfg.basePath, cfg.assumeDatePartitioning, fs);
+ super(cfg.basePath, cfg.assumeDatePartitioning,
cfg.useFileListingFromMetadata, cfg.verifyMetadataFileListing, fs);
this.syncConfig = cfg;
this.fs = fs;
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
index 419ea16..219106c 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
@@ -46,12 +46,17 @@ public abstract class AbstractSyncHoodieClient {
protected final FileSystem fs;
private String basePath;
private boolean assumeDatePartitioning;
+ private boolean useFileListingFromMetadata;
+ private boolean verifyMetadataFileListing;
- public AbstractSyncHoodieClient(String basePath, boolean
assumeDatePartitioning, FileSystem fs) {
+ public AbstractSyncHoodieClient(String basePath, boolean
assumeDatePartitioning, boolean useFileListingFromMetadata,
+ boolean verifyMetadataFileListing,
FileSystem fs) {
this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
this.tableType = metaClient.getTableType();
this.basePath = basePath;
this.assumeDatePartitioning = assumeDatePartitioning;
+ this.useFileListingFromMetadata = useFileListingFromMetadata;
+ this.verifyMetadataFileListing = verifyMetadataFileListing;
this.fs = fs;
}
@@ -120,7 +125,7 @@ public abstract class AbstractSyncHoodieClient {
if (!lastCommitTimeSynced.isPresent()) {
LOG.info("Last commit time synced is not known, listing all partitions
in " + basePath + ",FS :" + fs);
try {
- return FSUtils.getAllPartitionPaths(fs, basePath,
assumeDatePartitioning);
+ return FSUtils.getAllPartitionPaths(fs, basePath,
useFileListingFromMetadata, verifyMetadataFileListing, assumeDatePartitioning);
} catch (IOException e) {
throw new HoodieIOException("Failed to list all partitions in " +
basePath, e);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
index 916d019..501246e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
@@ -18,6 +18,7 @@
package org.apache.hudi.utilities;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -66,10 +67,17 @@ public class HoodieSnapshotCopier implements Serializable {
@Parameter(names = {"--date-partitioned", "-dp"}, description = "Can we
assume date partitioning?")
boolean shouldAssumeDatePartitioning = false;
+
+ @Parameter(names = {"--use-file-listing-from-metadata"}, description =
"Fetch file listing from Hudi's metadata")
+ public Boolean useFileListingFromMetadata =
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+
+ @Parameter(names = {"--verify-metadata-file-listing"}, description =
"Verify file listing from Hudi's metadata against file system")
+ public Boolean verifyMetadataFileListing =
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
}
public void snapshot(JavaSparkContext jsc, String baseDir, final String
outputDir,
- final boolean shouldAssumeDatePartitioning) throws IOException {
+ final boolean shouldAssumeDatePartitioning, final boolean
useFileListingFromMetadata,
+ final boolean verifyMetadataFileListing) throws IOException {
FileSystem fs = FSUtils.getFs(baseDir, jsc.hadoopConfiguration());
final SerializableConfiguration serConf = new
SerializableConfiguration(jsc.hadoopConfiguration());
final HoodieTableMetaClient tableMetadata = new
HoodieTableMetaClient(fs.getConf(), baseDir);
@@ -86,7 +94,7 @@ public class HoodieSnapshotCopier implements Serializable {
LOG.info(String.format("Starting to snapshot latest version files which
are also no-late-than %s.",
latestCommitTimestamp));
- List<String> partitions = FSUtils.getAllPartitionPaths(fs, baseDir,
shouldAssumeDatePartitioning);
+ List<String> partitions = FSUtils.getAllPartitionPaths(fs, baseDir,
useFileListingFromMetadata, verifyMetadataFileListing,
shouldAssumeDatePartitioning);
if (partitions.size() > 0) {
LOG.info(String.format("The job needs to copy %d partitions.",
partitions.size()));
@@ -177,7 +185,8 @@ public class HoodieSnapshotCopier implements Serializable {
// Copy
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
- copier.snapshot(jsc, cfg.basePath, cfg.outputPath,
cfg.shouldAssumeDatePartitioning);
+ copier.snapshot(jsc, cfg.basePath, cfg.outputPath,
cfg.shouldAssumeDatePartitioning, cfg.useFileListingFromMetadata,
+ cfg.verifyMetadataFileListing);
// Stop the job
jsc.stop();
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index 0743839..7ecc8ba 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -152,7 +152,7 @@ public class HoodieSnapshotExporter {
}
private List<String> getPartitions(FileSystem fs, Config cfg) throws
IOException {
- return FSUtils.getAllPartitionPaths(fs, cfg.sourceBasePath, false);
+ return FSUtils.getAllPartitionPaths(fs, cfg.sourceBasePath, true, false,
false);
}
private void createSuccessTag(FileSystem fs, Config cfg) throws IOException {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
index 6aaa6bd..cd15434 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
@@ -18,6 +18,7 @@
package org.apache.hudi.utilities.perf;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -83,7 +84,8 @@ public class TimelineServerPerf implements Serializable {
public void run() throws IOException {
- List<String> allPartitionPaths =
FSUtils.getAllPartitionPaths(timelineServer.getFs(), cfg.basePath, true);
+ List<String> allPartitionPaths =
FSUtils.getAllPartitionPaths(timelineServer.getFs(), cfg.basePath,
cfg.useFileListingFromMetadata,
+ cfg.verifyMetadataFileListing, true);
Collections.shuffle(allPartitionPaths);
List<String> selected = allPartitionPaths.stream().filter(p ->
!p.contains("error")).limit(cfg.maxPartitions)
.collect(Collectors.toList());
@@ -291,6 +293,12 @@ public class TimelineServerPerf implements Serializable {
@Parameter(names = {"--wait-for-manual-queries", "-ww"})
public Boolean waitForManualQueries = false;
+ @Parameter(names = {"--use-file-listing-from-metadata"}, description =
"Fetch file listing from Hudi's metadata")
+ public Boolean useFileListingFromMetadata =
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+
+ @Parameter(names = {"--verify-metadata-file-listing"}, description =
"Verify file listing from Hudi's metadata against file system")
+ public Boolean verifyMetadataFileListing =
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
+
@Parameter(names = {"--help", "-h"})
public Boolean help = false;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
index 95af888..0116ad4 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
@@ -18,6 +18,7 @@
package org.apache.hudi.utilities.functional;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
@@ -67,7 +68,8 @@ public class TestHoodieSnapshotCopier extends
FunctionalTestHarness {
// Do the snapshot
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
- copier.snapshot(jsc(), basePath, outputPath, true);
+ copier.snapshot(jsc(), basePath, outputPath, true,
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+ HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE);
// Nothing changed; we just bail out
assertEquals(fs.listStatus(new Path(basePath)).length, 1);
@@ -120,7 +122,8 @@ public class TestHoodieSnapshotCopier extends
FunctionalTestHarness {
// Do a snapshot copy
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
- copier.snapshot(jsc(), basePath, outputPath, false);
+ copier.snapshot(jsc(), basePath, outputPath, false,
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+ HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE);
// Check results
assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" +
file11.getName())));