This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 5ee71fd7f34af5b82b7047a6d686b12419940adf Author: Vova Kolmakov <[email protected]> AuthorDate: Thu Apr 25 12:40:43 2024 +0700 [HUDI-7645] Optimize BQ sync tool for MDT (#11065) --- .../org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java | 12 +++++++----- .../hudi/sync/common/util/ManifestFileWriter.java | 18 +++++++++++++----- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java index 6e064dd59c6..466627dc701 100644 --- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java @@ -54,6 +54,8 @@ import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_ public class BigQuerySyncTool extends HoodieSyncTool { private static final Logger LOG = LoggerFactory.getLogger(BigQuerySyncTool.class); + private static final String SUFFIX_MANIFEST = "_manifest"; + private static final String SUFFIX_VERSIONS = "_versions"; private final BigQuerySyncConfig config; private final String tableName; @@ -70,8 +72,8 @@ public class BigQuerySyncTool extends HoodieSyncTool { super(props); this.config = new BigQuerySyncConfig(props); this.tableName = config.getString(BIGQUERY_SYNC_TABLE_NAME); - this.manifestTableName = tableName + "_manifest"; - this.versionsTableName = tableName + "_versions"; + this.manifestTableName = tableName + SUFFIX_MANIFEST; + this.versionsTableName = tableName + SUFFIX_VERSIONS; this.snapshotViewName = tableName; this.bqSyncClient = new HoodieBigQuerySyncClient(config); // reuse existing meta client if not provided (only test cases will provide their own meta client) @@ -86,8 +88,8 @@ public class BigQuerySyncTool extends HoodieSyncTool { super(properties); this.config = new BigQuerySyncConfig(props); this.tableName = config.getString(BIGQUERY_SYNC_TABLE_NAME); - this.manifestTableName = tableName + "_manifest"; - this.versionsTableName = tableName + "_versions"; + this.manifestTableName = tableName + SUFFIX_MANIFEST; + this.versionsTableName = tableName + SUFFIX_VERSIONS; this.snapshotViewName = tableName; this.bqSyncClient = bigQuerySyncClient; this.metaClient = metaClient; @@ -117,7 +119,7 @@ public class BigQuerySyncTool extends HoodieSyncTool { private boolean tableExists(HoodieBigQuerySyncClient bqSyncClient, String tableName) { if (bqSyncClient.tableExists(tableName)) { - LOG.info(tableName + " already exists. Skip table creation."); + LOG.info("{} already exists. Skip table creation.", tableName); return true; } return false; diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java index ae7580fa9f3..6f7f4bb2c1f 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java @@ -69,7 +69,7 @@ public class ManifestFileWriter { LOG.warn("No base file to generate manifest file."); return; } else { - LOG.info("Writing base file names to manifest file: " + baseFiles.size()); + LOG.info("Writing base file names to manifest file: {}", baseFiles.size()); } final StoragePath manifestFilePath = getManifestFilePath(useAbsolutePath); try (OutputStream outputStream = metaClient.getStorage().create(manifestFilePath, true); @@ -87,15 +87,23 @@ public class ManifestFileWriter { public static Stream<String> fetchLatestBaseFilesForAllPartitions(HoodieTableMetaClient metaClient, boolean useFileListingFromMetadata, boolean assumeDatePartitioning, boolean useAbsolutePath) { try { - List<String> partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getHadoopConf()), - metaClient.getBasePath(), useFileListingFromMetadata, assumeDatePartitioning); - LOG.info("Retrieve all partitions: " + partitions.size()); Configuration hadoopConf = metaClient.getHadoopConf(); HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(hadoopConf); HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(engContext, metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), HoodieMetadataConfig.newBuilder().enable(useFileListingFromMetadata).withAssumeDatePartitioning(assumeDatePartitioning).build()); - return partitions.parallelStream().flatMap(partition -> fsView.getLatestBaseFiles(partition).map(useAbsolutePath ? HoodieBaseFile::getPath : HoodieBaseFile::getFileName)); + Stream<HoodieBaseFile> allLatestBaseFiles; + if (useFileListingFromMetadata) { + LOG.info("Fetching all base files from MDT."); + fsView.loadAllPartitions(); + allLatestBaseFiles = fsView.getLatestBaseFiles(); + } else { + List<String> partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getHadoopConf()), + metaClient.getBasePathV2().toString(), false, assumeDatePartitioning); + LOG.info("Retrieve all partitions from fs: {}", partitions.size()); + allLatestBaseFiles = partitions.parallelStream().flatMap(fsView::getLatestBaseFiles); + } + return allLatestBaseFiles.map(useAbsolutePath ? HoodieBaseFile::getPath : HoodieBaseFile::getFileName); } catch (Exception e) { throw new HoodieException("Error in fetching latest base files.", e); }
