This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 5ee71fd7f34af5b82b7047a6d686b12419940adf
Author: Vova Kolmakov <[email protected]>
AuthorDate: Thu Apr 25 12:40:43 2024 +0700

    [HUDI-7645] Optimize BQ sync tool for MDT (#11065)
---
 .../org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java | 12 +++++++-----
 .../hudi/sync/common/util/ManifestFileWriter.java      | 18 +++++++++++++-----
 2 files changed, 20 insertions(+), 10 deletions(-)

diff --git 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
index 6e064dd59c6..466627dc701 100644
--- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
+++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
@@ -54,6 +54,8 @@ import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_
 public class BigQuerySyncTool extends HoodieSyncTool {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BigQuerySyncTool.class);
+  private static final String SUFFIX_MANIFEST = "_manifest";
+  private static final String SUFFIX_VERSIONS = "_versions";
 
   private final BigQuerySyncConfig config;
   private final String tableName;
@@ -70,8 +72,8 @@ public class BigQuerySyncTool extends HoodieSyncTool {
     super(props);
     this.config = new BigQuerySyncConfig(props);
     this.tableName = config.getString(BIGQUERY_SYNC_TABLE_NAME);
-    this.manifestTableName = tableName + "_manifest";
-    this.versionsTableName = tableName + "_versions";
+    this.manifestTableName = tableName + SUFFIX_MANIFEST;
+    this.versionsTableName = tableName + SUFFIX_VERSIONS;
     this.snapshotViewName = tableName;
     this.bqSyncClient = new HoodieBigQuerySyncClient(config);
     // reuse existing meta client if not provided (only test cases will 
provide their own meta client)
@@ -86,8 +88,8 @@ public class BigQuerySyncTool extends HoodieSyncTool {
     super(properties);
     this.config = new BigQuerySyncConfig(props);
     this.tableName = config.getString(BIGQUERY_SYNC_TABLE_NAME);
-    this.manifestTableName = tableName + "_manifest";
-    this.versionsTableName = tableName + "_versions";
+    this.manifestTableName = tableName + SUFFIX_MANIFEST;
+    this.versionsTableName = tableName + SUFFIX_VERSIONS;
     this.snapshotViewName = tableName;
     this.bqSyncClient = bigQuerySyncClient;
     this.metaClient = metaClient;
@@ -117,7 +119,7 @@ public class BigQuerySyncTool extends HoodieSyncTool {
 
   private boolean tableExists(HoodieBigQuerySyncClient bqSyncClient, String 
tableName) {
     if (bqSyncClient.tableExists(tableName)) {
-      LOG.info(tableName + " already exists. Skip table creation.");
+      LOG.info("{} already exists. Skip table creation.", tableName);
       return true;
     }
     return false;
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
index ae7580fa9f3..6f7f4bb2c1f 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
@@ -69,7 +69,7 @@ public class ManifestFileWriter {
         LOG.warn("No base file to generate manifest file.");
         return;
       } else {
-        LOG.info("Writing base file names to manifest file: " + 
baseFiles.size());
+        LOG.info("Writing base file names to manifest file: {}", 
baseFiles.size());
       }
       final StoragePath manifestFilePath = 
getManifestFilePath(useAbsolutePath);
       try (OutputStream outputStream = 
metaClient.getStorage().create(manifestFilePath, true);
@@ -87,15 +87,23 @@ public class ManifestFileWriter {
   public static Stream<String> 
fetchLatestBaseFilesForAllPartitions(HoodieTableMetaClient metaClient,
       boolean useFileListingFromMetadata, boolean assumeDatePartitioning, 
boolean useAbsolutePath) {
     try {
-      List<String> partitions = FSUtils.getAllPartitionPaths(new 
HoodieLocalEngineContext(metaClient.getHadoopConf()),
-          metaClient.getBasePath(), useFileListingFromMetadata, 
assumeDatePartitioning);
-      LOG.info("Retrieve all partitions: " + partitions.size());
       Configuration hadoopConf = metaClient.getHadoopConf();
       HoodieLocalEngineContext engContext = new 
HoodieLocalEngineContext(hadoopConf);
       HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(engContext, metaClient,
           
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
           
HoodieMetadataConfig.newBuilder().enable(useFileListingFromMetadata).withAssumeDatePartitioning(assumeDatePartitioning).build());
-      return partitions.parallelStream().flatMap(partition -> 
fsView.getLatestBaseFiles(partition).map(useAbsolutePath ? 
HoodieBaseFile::getPath : HoodieBaseFile::getFileName));
+      Stream<HoodieBaseFile> allLatestBaseFiles;
+      if (useFileListingFromMetadata) {
+        LOG.info("Fetching all base files from MDT.");
+        fsView.loadAllPartitions();
+        allLatestBaseFiles = fsView.getLatestBaseFiles();
+      } else {
+        List<String> partitions = FSUtils.getAllPartitionPaths(new 
HoodieLocalEngineContext(metaClient.getHadoopConf()),
+            metaClient.getBasePathV2().toString(), false, 
assumeDatePartitioning);
+        LOG.info("Retrieve all partitions from fs: {}", partitions.size());
+        allLatestBaseFiles =  
partitions.parallelStream().flatMap(fsView::getLatestBaseFiles);
+      }
+      return allLatestBaseFiles.map(useAbsolutePath ? HoodieBaseFile::getPath 
: HoodieBaseFile::getFileName);
     } catch (Exception e) {
       throw new HoodieException("Error in fetching latest base files.", e);
     }

Reply via email to