This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/rfc-15 by this push:
     new c6c7e18  [HUDI-1292] [RFC-15] Use metadata table (if present) to get 
all partition paths (#2351)
c6c7e18 is described below

commit c6c7e18701b4d5a9435451b537b973be05342511
Author: Udit Mehrotra <[email protected]>
AuthorDate: Sun Dec 20 11:29:16 2020 -0800

    [HUDI-1292] [RFC-15] Use metadata table (if present) to get all partition 
paths (#2351)
---
 .../hudi/index/bloom/HoodieGlobalBloomIndex.java   |  2 +-
 .../hudi/index/simple/HoodieGlobalSimpleIndex.java |  3 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |  7 ++-
 .../compact/HoodieMergeOnReadTableCompactor.java   |  2 +-
 .../CopyOnWriteRollbackActionExecutor.java         |  2 +-
 .../hudi/table/action/rollback/RollbackUtils.java  | 10 ++--
 .../action/savepoint/SavepointActionExecutor.java  |  2 +-
 .../table/upgrade/ZeroToOneUpgradeHandler.java     |  2 +-
 .../org/apache/hudi/client/TestClientRollback.java |  5 +-
 .../apache/hudi/metadata/TestHoodieFsMetadata.java |  2 +-
 .../java/org/apache/hudi/common/fs/FSUtils.java    | 10 +++-
 .../metadata/FileSystemBackedTableMetadata.java    | 69 ++++++++++++++++++++++
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  9 ++-
 .../reader/DFSHoodieDatasetInputReader.java        |  4 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  4 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |  4 +-
 .../java/org/apache/hudi/client/TestBootstrap.java | 19 ++++--
 .../java/org/apache/hudi/dla/DLASyncConfig.java    | 12 ++++
 .../java/org/apache/hudi/dla/HoodieDLAClient.java  |  3 +-
 .../java/org/apache/hudi/hive/HiveSyncConfig.java  | 13 +++-
 .../org/apache/hudi/hive/HoodieHiveClient.java     |  2 +-
 .../hudi/sync/common/AbstractSyncHoodieClient.java |  9 ++-
 .../hudi/utilities/HoodieSnapshotCopier.java       | 15 ++++-
 .../hudi/utilities/HoodieSnapshotExporter.java     |  2 +-
 .../hudi/utilities/perf/TimelineServerPerf.java    | 10 +++-
 .../functional/TestHoodieSnapshotCopier.java       |  7 ++-
 26 files changed, 184 insertions(+), 45 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
 
b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
index 4f93b30..be389ed 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
@@ -64,7 +64,7 @@ public class HoodieGlobalBloomIndex<T extends 
HoodieRecordPayload> extends Hoodi
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
     try {
       List<String> allPartitionPaths = 
FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
-          config.shouldAssumeDatePartitioning());
+          config.useFileListingMetadata(), 
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
       return super.loadInvolvedFiles(allPartitionPaths, jsc, hoodieTable);
     } catch (IOException e) {
       throw new HoodieIOException("Failed to load all partitions", e);
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
 
b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
index 990f02d..1f3e954 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
@@ -100,7 +100,8 @@ public class HoodieGlobalSimpleIndex<T extends 
HoodieRecordPayload> extends Hood
   protected List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(final 
JavaSparkContext jsc, final HoodieTable hoodieTable) {
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
     try {
-      List<String> allPartitionPaths = 
FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), 
config.shouldAssumeDatePartitioning());
+      List<String> allPartitionPaths = 
FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
+          config.useFileListingMetadata(), 
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
       // Obtain the latest data files from all the partitions.
       return getLatestBaseFilesForAllPartitions(allPartitionPaths, jsc, 
hoodieTable);
     } catch (IOException e) {
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 45e4806..b560428 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -312,15 +312,16 @@ public class HoodieBackedTableMetadataWriter implements 
HoodieTableMetadataWrite
 
     // List all partitions in the basePath of the containing dataset
     FileSystem fs = datasetMetaClient.getFs();
-    List<String> partitions = FSUtils.getAllPartitionPaths(fs, 
datasetWriteConfig.getBasePath(), 
datasetWriteConfig.shouldAssumeDatePartitioning());
+    FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new 
FileSystemBackedTableMetadata(hadoopConf, datasetWriteConfig.getBasePath(),
+        datasetWriteConfig.shouldAssumeDatePartitioning());
+    List<String> partitions = 
fileSystemBackedTableMetadata.getAllPartitionPaths();
     LOG.info("Initializing metadata table by using file listings in " + 
partitions.size() + " partitions");
 
     // List all partitions in parallel and collect the files in them
     int parallelism =  Math.min(partitions.size(), jsc.defaultParallelism()) + 
1; // +1 to prevent 0 parallelism
     JavaPairRDD<String, FileStatus[]> partitionFileListRDD = 
jsc.parallelize(partitions, parallelism)
         .mapToPair(partition -> {
-          FileSystem fsys = datasetMetaClient.getFs();
-          FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new 
Path(datasetWriteConfig.getBasePath(), partition));
+          FileStatus[] statuses = 
fileSystemBackedTableMetadata.getAllFilesInPartition(new 
Path(datasetWriteConfig.getBasePath(), partition));
           return new Tuple2<>(partition, statuses);
         });
 
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
 
b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
index c4343f8..4a3402a 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
@@ -180,7 +180,7 @@ public class HoodieMergeOnReadTableCompactor implements 
HoodieCompactor {
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
     LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + 
compactionCommitTime);
     List<String> partitionPaths = 
FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
-        config.shouldAssumeDatePartitioning());
+        config.useFileListingMetadata(), 
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
 
     // filter the partition paths if needed to reduce list status
     partitionPaths = 
config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
 
b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
index b06dc05..6e1cc1c 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
@@ -91,7 +91,7 @@ public class CopyOnWriteRollbackActionExecutor extends 
BaseRollbackActionExecuto
   @Override
   protected List<HoodieRollbackStat> 
executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
     List<ListingBasedRollbackRequest> rollbackRequests = 
RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(),
 table.getMetaClient().getBasePath(),
-        config.shouldAssumeDatePartitioning());
+        config);
     return new ListingBasedRollbackHelper(table.getMetaClient(), 
config).performRollback(jsc, instantToRollback, rollbackRequests);
   }
 }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
 
b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
index 3bfd645..b3c91e4 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
@@ -88,12 +88,14 @@ public class RollbackUtils {
    * Generate all rollback requests that needs rolling back this action 
without actually performing rollback for COW table type.
    * @param fs instance of {@link FileSystem} to use.
    * @param basePath base path of interest.
-   * @param shouldAssumeDatePartitioning {@code true} if date partitioning 
should be assumed. {@code false} otherwise.
+   * @param config instance of {@link HoodieWriteConfig} to use.
    * @return {@link List} of {@link ListingBasedRollbackRequest}s thus 
collected.
    */
-  public static List<ListingBasedRollbackRequest> 
generateRollbackRequestsByListingCOW(FileSystem fs, String basePath, boolean 
shouldAssumeDatePartitioning) {
+  public static List<ListingBasedRollbackRequest> 
generateRollbackRequestsByListingCOW(FileSystem fs, String basePath,
+      HoodieWriteConfig config) {
     try {
-      return FSUtils.getAllPartitionPaths(fs, basePath, 
shouldAssumeDatePartitioning).stream()
+      return FSUtils.getAllPartitionPaths(fs, basePath, 
config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
+          config.shouldAssumeDatePartitioning()).stream()
           
.map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
           .collect(Collectors.toList());
     } catch (IOException e) {
@@ -113,7 +115,7 @@ public class RollbackUtils {
     String commit = instantToRollback.getTimestamp();
     HoodieWriteConfig config = table.getConfig();
     List<String> partitions = 
FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), 
table.getMetaClient().getBasePath(),
-        config.shouldAssumeDatePartitioning());
+        config.useFileListingMetadata(), 
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
     int sparkPartitions = Math.max(Math.min(partitions.size(), 
config.getRollbackParallelism()), 1);
     jsc.setJobGroup(RollbackUtils.class.getSimpleName(), "Generate all 
rollback requests");
     return jsc.parallelize(partitions, Math.min(partitions.size(), 
sparkPartitions)).flatMap(partitionPath -> {
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
 
b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index ac95118..07ca044 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -89,7 +89,7 @@ public class SavepointActionExecutor extends 
BaseActionExecutor<HoodieSavepointM
 
       jsc.setJobGroup(this.getClass().getSimpleName(), "Collecting latest 
files for savepoint " + instantTime);
       Map<String, List<String>> latestFilesMap = 
jsc.parallelize(FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
-          table.getMetaClient().getBasePath(), 
config.shouldAssumeDatePartitioning()))
+          table.getMetaClient().getBasePath(), 
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), 
config.shouldAssumeDatePartitioning()))
           .mapToPair(partitionPath -> {
             // Scan all partitions files with this commit time
             LOG.info("Collecting latest files in partition path " + 
partitionPath);
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
 
b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index e9c9e28..3ac85b3 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -89,7 +89,7 @@ public class ZeroToOneUpgradeHandler implements 
UpgradeHandler {
         List<ListingBasedRollbackRequest> rollbackRequests;
         if (table.getMetaClient().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
           rollbackRequests = 
RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(),
 table.getMetaClient().getBasePath(),
-              table.getConfig().shouldAssumeDatePartitioning());
+              table.getConfig());
         } else {
           rollbackRequests = 
RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(),
 table, jsc);
         }
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java 
b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index da4c002..5431d0e 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -99,8 +99,9 @@ public class TestClientRollback extends HoodieClientTestBase {
       statuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
       // Verify there are no errors
       assertNoWriteErrors(statuses);
-      List<String> partitionPaths =
-          FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), 
getConfig().shouldAssumeDatePartitioning());
+      HoodieWriteConfig config = getConfig();
+      List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, 
cfg.getBasePath(), config.useFileListingMetadata(),
+          config.getFileListingMetadataVerify(), 
config.shouldAssumeDatePartitioning());
       metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTable table = HoodieTable.create(metaClient, getConfig(), 
hadoopConf);
       final BaseFileOnlyView view1 = table.getBaseFileOnlyView();
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java 
b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
index edd8359..2823035 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
@@ -718,7 +718,7 @@ public class TestHoodieFsMetadata extends 
HoodieClientTestHarness {
     // Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that 
function filters all directory
     // in the .hoodie folder.
     List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(fs, 
HoodieTableMetadata.getMetadataTableBasePath(basePath),
-        false);
+        false, false,false);
     assertEquals(MetadataPartitionType.values().length, 
metadataTablePartitions.size());
 
     // Metadata table should automatically compact and clean
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index c2338cf..0a84d13 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -28,6 +28,8 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.InvalidHoodiePathException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -248,12 +250,14 @@ public class FSUtils {
     }
   }
 
-  public static List<String> getAllPartitionPaths(FileSystem fs, String 
basePathStr, boolean assumeDatePartitioning)
-      throws IOException {
+  public static List<String> getAllPartitionPaths(FileSystem fs, String 
basePathStr, boolean useFileListingFromMetadata, boolean verifyListings,
+                                                  boolean 
assumeDatePartitioning) throws IOException {
     if (assumeDatePartitioning) {
       return getAllPartitionFoldersThreeLevelsDown(fs, basePathStr);
     } else {
-      return getAllFoldersWithPartitionMetaFile(fs, basePathStr);
+      HoodieTableMetadata tableMetadata = 
HoodieTableMetadata.create(fs.getConf(), basePathStr, "/tmp/", 
useFileListingFromMetadata,
+          verifyListings, false, false);
+      return tableMetadata.getAllPartitionPaths();
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
new file mode 100644
index 0000000..73ce8e4
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
+
+  private final SerializableConfiguration hadoopConf;
+  private final String datasetBasePath;
+  private final boolean assumeDatePartitioning;
+
+  public FileSystemBackedTableMetadata(SerializableConfiguration conf, String 
datasetBasePath, boolean assumeDatePartitioning) {
+    this.hadoopConf = conf;
+    this.datasetBasePath = datasetBasePath;
+    this.assumeDatePartitioning = assumeDatePartitioning;
+  }
+
+  @Override
+  public FileStatus[] getAllFilesInPartition(Path partitionPath) throws 
IOException {
+    FileSystem fs = partitionPath.getFileSystem(hadoopConf.get());
+    return FSUtils.getAllDataFilesInPartition(fs, partitionPath);
+  }
+
+  @Override
+  public List<String> getAllPartitionPaths() throws IOException {
+    FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get());
+    if (assumeDatePartitioning) {
+      return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, 
datasetBasePath);
+    } else {
+      return FSUtils.getAllFoldersWithPartitionMetaFile(fs, datasetBasePath);
+    }
+  }
+
+  @Override
+  public Option<String> getSyncedInstantTime() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isInSync() {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index ca7c27a..3a1c7bf 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -30,7 +30,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
@@ -108,7 +107,7 @@ public class HoodieBackedTableMetadata implements 
HoodieTableMetadata {
       try {
         this.metaClient = new HoodieTableMetaClient(hadoopConf.get(), 
metadataBasePath);
       } catch (TableNotFoundException e) {
-        LOG.error("Metadata table was not found at path " + metadataBasePath);
+        LOG.warn("Metadata table was not found at path " + metadataBasePath + 
", hence not using it.");
         this.enabled = false;
       } catch (Exception e) {
         LOG.error("Failed to initialize metadata table at path " + 
metadataBasePath, e);
@@ -145,8 +144,7 @@ public class HoodieBackedTableMetadata implements 
HoodieTableMetadata {
       }
     }
 
-    FileSystem fs = FSUtils.getFs(datasetBasePath, hadoopConf.get());
-    return FSUtils.getAllPartitionPaths(fs, datasetBasePath, 
assumeDatePartitioning);
+    return new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, 
assumeDatePartitioning).getAllPartitionPaths();
   }
 
   /**
@@ -199,7 +197,8 @@ public class HoodieBackedTableMetadata implements 
HoodieTableMetadata {
     if (validateLookups) {
       // Validate the Metadata Table data by listing the partitions from the 
file system
       timer.startTimer();
-      List<String> actualPartitions  = 
FSUtils.getAllPartitionPaths(metaClient.getFs(), datasetBasePath, false);
+      FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new 
FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, 
assumeDatePartitioning);
+      List<String> actualPartitions = 
fileSystemBackedTableMetadata.getAllPartitionPaths();
       metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR, 
timer.endTimer()));
 
       Collections.sort(actualPartitions);
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 209aa46..1690368 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -38,6 +38,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -79,7 +80,8 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
 
   protected List<String> getPartitions(Option<Integer> partitionsLimit) throws 
IOException {
     List<String> partitionPaths = FSUtils
-        .getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), 
false);
+        .getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+            HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE,false);
     // Sort partition so we can pick last N partitions by default
     Collections.sort(partitionPaths);
     if (!partitionPaths.isEmpty()) {
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index df7960e..5360afe 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.client.{HoodieWriteClient, HoodieWriteResult}
-import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
 import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, 
WriteOperationType}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
@@ -341,6 +341,8 @@ private[hudi] object HoodieSparkSqlWriter {
       
ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList:
 _*)
     hiveSyncConfig.partitionValueExtractorClass = 
parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
     hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
+    hiveSyncConfig.useFileListingFromMetadata = 
parameters(HoodieMetadataConfig.METADATA_ENABLE_PROP).toBoolean
+    hiveSyncConfig.verifyMetadataFileListing = 
parameters(HoodieMetadataConfig.METADATA_VALIDATE_PROP).toBoolean
     hiveSyncConfig
   }
 
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 294050b..d8540ce 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -18,7 +18,7 @@
 package org.apache.hudi
 
 import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
 
 import scala.collection.JavaConversions.mapAsJavaMap
 import scala.collection.JavaConverters.mapAsScalaMapConverter
@@ -46,6 +46,8 @@ object HoodieWriterUtils {
       RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL,
       PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,
       KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,
+      HoodieMetadataConfig.METADATA_ENABLE_PROP -> 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE.toString,
+      HoodieMetadataConfig.METADATA_VALIDATE_PROP -> 
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE.toString,
       COMMIT_METADATA_KEYPREFIX_OPT_KEY -> 
DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL,
       INSERT_DROP_DUPS_OPT_KEY -> DEFAULT_INSERT_DROP_DUPS_OPT_VAL,
       STREAMING_RETRY_CNT_OPT_KEY -> DEFAULT_STREAMING_RETRY_CNT_OPT_VAL,
diff --git a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java 
b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
index 14f36d4..2f45c0c 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
+++ b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
@@ -27,6 +27,7 @@ import 
org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
 import 
org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
 import org.apache.hudi.common.bootstrap.FileStatusUtils;
 import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieKey;
@@ -371,7 +372,8 @@ public class TestBootstrap extends HoodieClientTestBase {
     reloadInputFormats();
     List<GenericRecord> records = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
false).stream()
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+            HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE,false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, false, new 
ArrayList<>());
     assertEquals(totalRecords, records.size());
@@ -389,7 +391,8 @@ public class TestBootstrap extends HoodieClientTestBase {
     seenKeys = new HashSet<>();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
false).stream()
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+            HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, false, new 
ArrayList<>());
     assertEquals(totalRecords, records.size());
@@ -405,7 +408,8 @@ public class TestBootstrap extends HoodieClientTestBase {
     reloadInputFormats();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
false).stream()
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+            HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE,false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES,
         true, HoodieRecord.HOODIE_META_COLUMNS);
@@ -422,7 +426,8 @@ public class TestBootstrap extends HoodieClientTestBase {
     seenKeys = new HashSet<>();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
false).stream()
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+            HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE,false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, true,
         HoodieRecord.HOODIE_META_COLUMNS);
@@ -437,7 +442,8 @@ public class TestBootstrap extends HoodieClientTestBase {
     reloadInputFormats();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
false).stream()
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+            HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE,false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, true,
         Arrays.asList("_row_key"));
@@ -454,7 +460,8 @@ public class TestBootstrap extends HoodieClientTestBase {
     seenKeys = new HashSet<>();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
false).stream()
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+            HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE,false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, true,
         Arrays.asList("_row_key"));
diff --git 
a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java 
b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java
index 988e9cd..4cf755d 100644
--- 
a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java
+++ 
b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java
@@ -19,6 +19,8 @@
 package org.apache.hudi.dla;
 
 import com.beust.jcommander.Parameter;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
 
 import java.io.Serializable;
@@ -65,6 +67,12 @@ public class DLASyncConfig implements Serializable {
   @Parameter(names = {"--hive-style-partitioning"}, description = "Use DLA 
hive style partitioning, true if like the following style: 
field1=value1/field2=value2")
   public Boolean useDLASyncHiveStylePartitioning = false;
 
+  @Parameter(names = {"--use-file-listing-from-metadata"}, description = 
"Fetch file listing from Hudi's metadata")
+  public Boolean useFileListingFromMetadata = 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+
+  @Parameter(names = {"--verify-metadata-file-listing"}, description = "Verify 
file listing from Hudi's metadata against file system")
+  public Boolean verifyMetadataFileListing = 
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
+
   @Parameter(names = {"--help", "-h"}, help = true)
   public Boolean help = false;
 
@@ -81,6 +89,8 @@ public class DLASyncConfig implements Serializable {
     newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
     newConfig.skipROSuffix = cfg.skipROSuffix;
     newConfig.useDLASyncHiveStylePartitioning = 
cfg.useDLASyncHiveStylePartitioning;
+    newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
+    newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing;
     return newConfig;
   }
 
@@ -91,6 +101,8 @@ public class DLASyncConfig implements Serializable {
         + ", basePath='" + basePath + '\'' + ", partitionFields=" + 
partitionFields + ", partitionValueExtractorClass='"
         + partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + 
assumeDatePartitioning
         + ", useDLASyncHiveStylePartitioning=" + 
useDLASyncHiveStylePartitioning
+        + ", useFileListingFromMetadata=" + useFileListingFromMetadata
+        + ", verifyMetadataFileListing=" + verifyMetadataFileListing
         + ", help=" + help + '}';
   }
 }
diff --git 
a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
 
b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
index 34a96c9..02c07d6 100644
--- 
a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
+++ 
b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
@@ -70,7 +70,8 @@ public class HoodieDLAClient extends AbstractSyncHoodieClient 
{
   private PartitionValueExtractor partitionValueExtractor;
 
   public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) {
-    super(syncConfig.basePath, syncConfig.assumeDatePartitioning, fs);
+    super(syncConfig.basePath, syncConfig.assumeDatePartitioning, 
syncConfig.useFileListingFromMetadata,
+        syncConfig.verifyMetadataFileListing, fs);
     this.dlaConfig = syncConfig;
     try {
       this.partitionValueExtractor =
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index c861a53..a49bdef 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.hive;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+
 import com.beust.jcommander.Parameter;
 
 import java.io.Serializable;
@@ -77,6 +79,12 @@ public class HiveSyncConfig implements Serializable {
   @Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` 
suffix for Read optimized table, when registering")
   public Boolean skipROSuffix = false;
 
+  @Parameter(names = {"--use-file-listing-from-metadata"}, description = 
"Fetch file listing from Hudi's metadata")
+  public Boolean useFileListingFromMetadata = 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+
+  @Parameter(names = {"--verify-metadata-file-listing"}, description = "Verify 
file listing from Hudi's metadata against file system")
+  public Boolean verifyMetadataFileListing = 
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
+
   @Parameter(names = {"--help", "-h"}, help = true)
   public Boolean help = false;
 
@@ -92,6 +100,8 @@ public class HiveSyncConfig implements Serializable {
     newConfig.jdbcUrl = cfg.jdbcUrl;
     newConfig.tableName = cfg.tableName;
     newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
+    newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
+    newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing;
     return newConfig;
   }
 
@@ -101,6 +111,7 @@ public class HiveSyncConfig implements Serializable {
         + ", hiveUser='" + hiveUser + '\'' + ", hivePass='" + hivePass + '\'' 
+ ", jdbcUrl='" + jdbcUrl + '\''
         + ", basePath='" + basePath + '\'' + ", partitionFields=" + 
partitionFields + ", partitionValueExtractorClass='"
         + partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + 
assumeDatePartitioning
-        + ", usePreApacheInputFormat=" + usePreApacheInputFormat + ", 
useJdbc=" + useJdbc + ", help=" + help + '}';
+        + ", usePreApacheInputFormat=" + usePreApacheInputFormat + ", 
useJdbc=" + useJdbc + ", useFileListingFromMetadata="
+        + useFileListingFromMetadata + ", verifyMetadataFileListing=" + 
verifyMetadataFileListing + ", help=" + help + '}';
   }
 }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index eb8b867..bce8104 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -84,7 +84,7 @@ public class HoodieHiveClient extends 
AbstractSyncHoodieClient {
   private HiveConf configuration;
 
   public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, 
FileSystem fs) {
-    super(cfg.basePath, cfg.assumeDatePartitioning, fs);
+    super(cfg.basePath, cfg.assumeDatePartitioning, 
cfg.useFileListingFromMetadata, cfg.verifyMetadataFileListing, fs);
     this.syncConfig = cfg;
     this.fs = fs;
 
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
index 419ea16..219106c 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
@@ -46,12 +46,17 @@ public abstract class AbstractSyncHoodieClient {
   protected final FileSystem fs;
   private String basePath;
   private boolean assumeDatePartitioning;
+  private boolean useFileListingFromMetadata;
+  private boolean verifyMetadataFileListing;
 
-  public AbstractSyncHoodieClient(String basePath, boolean 
assumeDatePartitioning, FileSystem fs) {
+  public AbstractSyncHoodieClient(String basePath, boolean 
assumeDatePartitioning, boolean useFileListingFromMetadata,
+                                  boolean verifyMetadataFileListing, 
FileSystem fs) {
     this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
     this.tableType = metaClient.getTableType();
     this.basePath = basePath;
     this.assumeDatePartitioning = assumeDatePartitioning;
+    this.useFileListingFromMetadata = useFileListingFromMetadata;
+    this.verifyMetadataFileListing = verifyMetadataFileListing;
     this.fs = fs;
   }
 
@@ -120,7 +125,7 @@ public abstract class AbstractSyncHoodieClient {
     if (!lastCommitTimeSynced.isPresent()) {
       LOG.info("Last commit time synced is not known, listing all partitions 
in " + basePath + ",FS :" + fs);
       try {
-        return FSUtils.getAllPartitionPaths(fs, basePath, 
assumeDatePartitioning);
+        return FSUtils.getAllPartitionPaths(fs, basePath, 
useFileListingFromMetadata, verifyMetadataFileListing, assumeDatePartitioning);
       } catch (IOException e) {
         throw new HoodieIOException("Failed to list all partitions in " + 
basePath, e);
       }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
index 916d019..501246e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.utilities;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -66,10 +67,17 @@ public class HoodieSnapshotCopier implements Serializable {
 
     @Parameter(names = {"--date-partitioned", "-dp"}, description = "Can we 
assume date partitioning?")
     boolean shouldAssumeDatePartitioning = false;
+
+    @Parameter(names = {"--use-file-listing-from-metadata"}, description = 
"Fetch file listing from Hudi's metadata")
+    public Boolean useFileListingFromMetadata = 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+
+    @Parameter(names = {"--verify-metadata-file-listing"}, description = 
"Verify file listing from Hudi's metadata against file system")
+    public Boolean verifyMetadataFileListing = 
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
   }
 
   public void snapshot(JavaSparkContext jsc, String baseDir, final String 
outputDir,
-      final boolean shouldAssumeDatePartitioning) throws IOException {
+      final boolean shouldAssumeDatePartitioning, final boolean 
useFileListingFromMetadata,
+      final boolean verifyMetadataFileListing) throws IOException {
     FileSystem fs = FSUtils.getFs(baseDir, jsc.hadoopConfiguration());
     final SerializableConfiguration serConf = new 
SerializableConfiguration(jsc.hadoopConfiguration());
     final HoodieTableMetaClient tableMetadata = new 
HoodieTableMetaClient(fs.getConf(), baseDir);
@@ -86,7 +94,7 @@ public class HoodieSnapshotCopier implements Serializable {
     LOG.info(String.format("Starting to snapshot latest version files which 
are also no-late-than %s.",
         latestCommitTimestamp));
 
-    List<String> partitions = FSUtils.getAllPartitionPaths(fs, baseDir, 
shouldAssumeDatePartitioning);
+    List<String> partitions = FSUtils.getAllPartitionPaths(fs, baseDir, 
useFileListingFromMetadata, verifyMetadataFileListing, 
shouldAssumeDatePartitioning);
     if (partitions.size() > 0) {
       LOG.info(String.format("The job needs to copy %d partitions.", 
partitions.size()));
 
@@ -177,7 +185,8 @@ public class HoodieSnapshotCopier implements Serializable {
 
     // Copy
     HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
-    copier.snapshot(jsc, cfg.basePath, cfg.outputPath, 
cfg.shouldAssumeDatePartitioning);
+    copier.snapshot(jsc, cfg.basePath, cfg.outputPath, 
cfg.shouldAssumeDatePartitioning, cfg.useFileListingFromMetadata,
+        cfg.verifyMetadataFileListing);
 
     // Stop the job
     jsc.stop();
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index 0743839..7ecc8ba 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -152,7 +152,7 @@ public class HoodieSnapshotExporter {
   }
 
   private List<String> getPartitions(FileSystem fs, Config cfg) throws 
IOException {
-    return FSUtils.getAllPartitionPaths(fs, cfg.sourceBasePath, false);
+    return FSUtils.getAllPartitionPaths(fs, cfg.sourceBasePath, true, false, 
false);
   }
 
   private void createSuccessTag(FileSystem fs, Config cfg) throws IOException {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
index 6aaa6bd..cd15434 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.utilities.perf;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -83,7 +84,8 @@ public class TimelineServerPerf implements Serializable {
 
   public void run() throws IOException {
 
-    List<String> allPartitionPaths = 
FSUtils.getAllPartitionPaths(timelineServer.getFs(), cfg.basePath, true);
+    List<String> allPartitionPaths = 
FSUtils.getAllPartitionPaths(timelineServer.getFs(), cfg.basePath, 
cfg.useFileListingFromMetadata,
+        cfg.verifyMetadataFileListing, true);
     Collections.shuffle(allPartitionPaths);
     List<String> selected = allPartitionPaths.stream().filter(p -> 
!p.contains("error")).limit(cfg.maxPartitions)
         .collect(Collectors.toList());
@@ -291,6 +293,12 @@ public class TimelineServerPerf implements Serializable {
     @Parameter(names = {"--wait-for-manual-queries", "-ww"})
     public Boolean waitForManualQueries = false;
 
+    @Parameter(names = {"--use-file-listing-from-metadata"}, description = 
"Fetch file listing from Hudi's metadata")
+    public Boolean useFileListingFromMetadata = 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+
+    @Parameter(names = {"--verify-metadata-file-listing"}, description = 
"Verify file listing from Hudi's metadata against file system")
+    public Boolean verifyMetadataFileListing = 
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
+
     @Parameter(names = {"--help", "-h"})
     public Boolean help = false;
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
index 95af888..0116ad4 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.utilities.functional;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
@@ -67,7 +68,8 @@ public class TestHoodieSnapshotCopier extends 
FunctionalTestHarness {
 
     // Do the snapshot
     HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
-    copier.snapshot(jsc(), basePath, outputPath, true);
+    copier.snapshot(jsc(), basePath, outputPath, true, 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+        HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE);
 
     // Nothing changed; we just bail out
     assertEquals(fs.listStatus(new Path(basePath)).length, 1);
@@ -120,7 +122,8 @@ public class TestHoodieSnapshotCopier extends 
FunctionalTestHarness {
 
     // Do a snapshot copy
     HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
-    copier.snapshot(jsc(), basePath, outputPath, false);
+    copier.snapshot(jsc(), basePath, outputPath, false, 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+        HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE);
 
     // Check results
     assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + 
file11.getName())));

Reply via email to