This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 6d35691b6a8ccc46baff3ce92a021c0db8ad0934 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Fri May 10 09:57:02 2024 -0700 [HUDI-7654] Optimizing BQ sync for MDT (#11061) * Optimizing BQ sync for MDT * Adding tests --- .../hudi/sync/common/util/ManifestFileWriter.java | 51 ++++++--- .../utilities/TestManifestFileWriterSpark.java | 117 +++++++++++++++++++++ 2 files changed, 151 insertions(+), 17 deletions(-) diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java index ea6fa8dc5f9..20addf80d56 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java @@ -19,13 +19,17 @@ package org.apache.hudi.sync.common.util; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.FileSystemViewManager; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.metadata.HoodieMetadataFileSystemView; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hadoop.fs.Path; @@ -83,25 +87,14 @@ public class ManifestFileWriter { } } + @VisibleForTesting public static Stream<String> fetchLatestBaseFilesForAllPartitions(HoodieTableMetaClient metaClient, boolean useFileListingFromMetadata, boolean assumeDatePartitioning, boolean useAbsolutePath) { try { - HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(metaClient.getStorageConf()); - HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(engContext, metaClient, - metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), - HoodieMetadataConfig.newBuilder().enable(useFileListingFromMetadata).withAssumeDatePartitioning(assumeDatePartitioning).build()); - Stream<HoodieBaseFile> allLatestBaseFiles; - if (useFileListingFromMetadata) { - LOG.info("Fetching all base files from MDT."); - fsView.loadAllPartitions(); - allLatestBaseFiles = fsView.getLatestBaseFiles(); - } else { - List<String> partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getStorageConf()), - metaClient.getBasePathV2().toString(), false, assumeDatePartitioning); - LOG.info("Retrieve all partitions from fs: {}", partitions.size()); - allLatestBaseFiles = partitions.parallelStream().flatMap(fsView::getLatestBaseFiles); - } - return allLatestBaseFiles.map(useAbsolutePath ? HoodieBaseFile::getPath : HoodieBaseFile::getFileName); + StorageConfiguration storageConf = metaClient.getStorageConf(); + HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(storageConf); + boolean canUseMetadataTable = useFileListingFromMetadata && metaClient.getTableConfig().isMetadataTableAvailable(); + return getLatestBaseFiles(canUseMetadataTable, engContext, metaClient, useAbsolutePath); } catch (Exception e) { throw new HoodieException("Error in fetching latest base files.", e); } @@ -111,6 +104,30 @@ public class ManifestFileWriter { return new StoragePath(metaClient.getMetaPath(), useAbsolutePath ? ABSOLUTE_PATH_MANIFEST_FOLDER_NAME : MANIFEST_FOLDER_NAME); } + @VisibleForTesting + static Stream<String> getLatestBaseFiles(boolean canUseMetadataTable, HoodieEngineContext engContext, HoodieTableMetaClient metaClient, + boolean useAbsolutePath) { + List<String> partitions = FSUtils.getAllPartitionPaths(engContext, metaClient.getBasePath(), canUseMetadataTable, false); + LOG.info("Retrieve all partitions: " + partitions.size()); + HoodieTableFileSystemView fsView = null; + try { + fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engContext, metaClient, + HoodieMetadataConfig.newBuilder().enable(canUseMetadataTable).build(), + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()); + if (canUseMetadataTable) { + // incase of MDT, we can load all partitions at once. If not for MDT, we can rely on fsView.getLatestBaseFiles(partition) for each partition to load from FS. + fsView.loadAllPartitions(); + } + HoodieTableFileSystemView finalFsView = fsView; + // if we do not collect and return stream directly, lazy evaluation happens and we end up closing the fsview in finally block which later + // fails the getLatestBaseFiles call. Hence we collect and return a stream. + return partitions.parallelStream().flatMap(partition -> finalFsView.getLatestBaseFiles(partition) + .map(useAbsolutePath ? HoodieBaseFile::getPath : HoodieBaseFile::getFileName)).collect(Collectors.toList()).stream(); + } finally { + fsView.close(); + } + } + public StoragePath getManifestFilePath(boolean useAbsolutePath) { return new StoragePath(getManifestFolder(useAbsolutePath), MANIFEST_FILE_NAME); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestManifestFileWriterSpark.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestManifestFileWriterSpark.java new file mode 100644 index 00000000000..3a750dda54a --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestManifestFileWriterSpark.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities; + +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.testutils.HoodieMetadataTestTable; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadataWriter; +import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.sync.common.util.ManifestFileWriter; +import org.apache.hudi.testutils.HoodieSparkClientTestHarness; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static org.apache.hudi.common.model.WriteOperationType.UPSERT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class TestManifestFileWriterSpark extends HoodieSparkClientTestHarness { + + protected HoodieTableType tableType; + + @BeforeEach + public void setUp() throws IOException { + this.tableType = HoodieTableType.COPY_ON_WRITE; + initPath(); + initSparkContexts("TestHoodieMetadata"); + initHoodieStorage(); + initMetaClient(tableType); + } + + @AfterEach + public void tearDown() throws IOException { + cleanupResources(); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testCreateManifestFile(boolean enableMetadata) throws Exception { + HoodieWriteConfig writeConfig = getWriteConfig(basePath, enableMetadata); + + // Generate data files for 3 partitions. + createTestDataForPartitionedTable(metaClient, enableMetadata, context, context.getStorageConf(), writeConfig); + ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setMetaClient(metaClient).build(); + manifestFileWriter.writeManifestFile(false); + StoragePath manifestFilePath = manifestFileWriter.getManifestFilePath(false); + try (InputStream is = metaClient.getStorage().open(manifestFilePath)) { + List<String> expectedLines = FileIOUtils.readAsUTFStringLines(is); + assertEquals(9, expectedLines.size(), "there should be 9 base files in total; 3 per partition."); + expectedLines.forEach(line -> assertFalse(line.contains(basePath))); + } + } + + private static void createTestDataForPartitionedTable(HoodieTableMetaClient metaClient, + boolean enableMetadata, HoodieEngineContext context, StorageConfiguration storageConfiguration, + HoodieWriteConfig writeConfig) throws Exception { + final String instantTime = "100"; + HoodieTestTable testTable = null; + if (enableMetadata) { + HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConfiguration, writeConfig, context); + // reload because table configs could have been updated + metaClient = HoodieTableMetaClient.reload(metaClient); + testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context)); + } else { + testTable = HoodieTestTable.of(metaClient); + } + doWriteOperation(testTable, instantTime); + } + + private HoodieWriteConfig getWriteConfig(String basePath, boolean enableMetadata) { + return HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build()).build(); + } + + protected static void doWriteOperation(HoodieTestTable testTable, String commitTime) throws Exception { + doWriteOperation(testTable, commitTime, UPSERT); + } + + protected static void doWriteOperation(HoodieTestTable testTable, String commitTime, WriteOperationType operationType) throws Exception { + testTable.withPartitionMetaFiles("p1", "p2", "p3"); + testTable.doWriteOperation(commitTime, operationType, emptyList(), asList("p1", "p2", "p3"), 3); + } +}
