This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c75ae7f9ba2 [HUDI-7654] Optimizing BQ sync for MDT (#11061)
c75ae7f9ba2 is described below
commit c75ae7f9ba21bbf9ad9101415d632ed91af27712
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri May 10 09:57:02 2024 -0700
[HUDI-7654] Optimizing BQ sync for MDT (#11061)
* Optimizing BQ sync for MDT
* Adding tests
---
.../hudi/sync/common/util/ManifestFileWriter.java | 51 ++++++---
.../utilities/TestManifestFileWriterSpark.java | 117 +++++++++++++++++++++
2 files changed, 151 insertions(+), 17 deletions(-)
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
index 2a15997ab21..3eaf80dddfe 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
@@ -19,13 +19,17 @@
package org.apache.hudi.sync.common.util;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hadoop.fs.Path;
@@ -81,25 +85,14 @@ public class ManifestFileWriter {
}
}
+ @VisibleForTesting
public static Stream<String>
fetchLatestBaseFilesForAllPartitions(HoodieTableMetaClient metaClient,
boolean useFileListingFromMetadata, boolean useAbsolutePath) {
try {
- HoodieLocalEngineContext engContext = new
HoodieLocalEngineContext(metaClient.getStorageConf());
- HoodieMetadataFileSystemView fsView = new
HoodieMetadataFileSystemView(engContext, metaClient,
-
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
-
HoodieMetadataConfig.newBuilder().enable(useFileListingFromMetadata).build());
- Stream<HoodieBaseFile> allLatestBaseFiles;
- if (useFileListingFromMetadata) {
- LOG.info("Fetching all base files from MDT.");
- fsView.loadAllPartitions();
- allLatestBaseFiles = fsView.getLatestBaseFiles();
- } else {
- List<String> partitions = FSUtils.getAllPartitionPaths(new
HoodieLocalEngineContext(metaClient.getStorageConf()),
- metaClient.getBasePathV2().toString(), false);
- LOG.info("Retrieve all partitions from fs: {}", partitions.size());
- allLatestBaseFiles =
partitions.parallelStream().flatMap(fsView::getLatestBaseFiles);
- }
- return allLatestBaseFiles.map(useAbsolutePath ? HoodieBaseFile::getPath
: HoodieBaseFile::getFileName);
+ StorageConfiguration storageConf = metaClient.getStorageConf();
+ HoodieLocalEngineContext engContext = new
HoodieLocalEngineContext(storageConf);
+ boolean canUseMetadataTable = useFileListingFromMetadata &&
metaClient.getTableConfig().isMetadataTableAvailable();
+ return getLatestBaseFiles(canUseMetadataTable, engContext, metaClient,
useAbsolutePath);
} catch (Exception e) {
throw new HoodieException("Error in fetching latest base files.", e);
}
@@ -109,6 +102,30 @@ public class ManifestFileWriter {
return new StoragePath(metaClient.getMetaPath(), useAbsolutePath ?
ABSOLUTE_PATH_MANIFEST_FOLDER_NAME : MANIFEST_FOLDER_NAME);
}
+ @VisibleForTesting
+ static Stream<String> getLatestBaseFiles(boolean canUseMetadataTable,
HoodieEngineContext engContext, HoodieTableMetaClient metaClient,
+ boolean useAbsolutePath) {
+ List<String> partitions = FSUtils.getAllPartitionPaths(engContext,
metaClient.getBasePath(), canUseMetadataTable);
+ LOG.info("Retrieve all partitions: " + partitions.size());
+ HoodieTableFileSystemView fsView = null;
+ try {
+ fsView =
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engContext,
metaClient,
+
HoodieMetadataConfig.newBuilder().enable(canUseMetadataTable).build(),
+
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
+ if (canUseMetadataTable) {
+ // incase of MDT, we can load all partitions at once. If not for MDT,
we can rely on fsView.getLatestBaseFiles(partition) for each partition to load
from FS.
+ fsView.loadAllPartitions();
+ }
+ HoodieTableFileSystemView finalFsView = fsView;
+ // if we do not collect and return stream directly, lazy evaluation
happens and we end up closing the fsview in finally block which later
+ // fails the getLatestBaseFiles call. Hence we collect and return a
stream.
+ return partitions.parallelStream().flatMap(partition ->
finalFsView.getLatestBaseFiles(partition)
+ .map(useAbsolutePath ? HoodieBaseFile::getPath :
HoodieBaseFile::getFileName)).collect(Collectors.toList()).stream();
+ } finally {
+ fsView.close();
+ }
+ }
+
public StoragePath getManifestFilePath(boolean useAbsolutePath) {
return new StoragePath(getManifestFolder(useAbsolutePath),
MANIFEST_FILE_NAME);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestManifestFileWriterSpark.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestManifestFileWriterSpark.java
new file mode 100644
index 00000000000..3a750dda54a
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestManifestFileWriterSpark.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.sync.common.util.ManifestFileWriter;
+import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class TestManifestFileWriterSpark extends HoodieSparkClientTestHarness {
+
+ protected HoodieTableType tableType;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ this.tableType = HoodieTableType.COPY_ON_WRITE;
+ initPath();
+ initSparkContexts("TestHoodieMetadata");
+ initHoodieStorage();
+ initMetaClient(tableType);
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ cleanupResources();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testCreateManifestFile(boolean enableMetadata) throws Exception {
+ HoodieWriteConfig writeConfig = getWriteConfig(basePath, enableMetadata);
+
+ // Generate data files for 3 partitions.
+ createTestDataForPartitionedTable(metaClient, enableMetadata, context,
context.getStorageConf(), writeConfig);
+ ManifestFileWriter manifestFileWriter =
ManifestFileWriter.builder().setMetaClient(metaClient).build();
+ manifestFileWriter.writeManifestFile(false);
+ StoragePath manifestFilePath =
manifestFileWriter.getManifestFilePath(false);
+ try (InputStream is = metaClient.getStorage().open(manifestFilePath)) {
+ List<String> expectedLines = FileIOUtils.readAsUTFStringLines(is);
+ assertEquals(9, expectedLines.size(), "there should be 9 base files in
total; 3 per partition.");
+ expectedLines.forEach(line -> assertFalse(line.contains(basePath)));
+ }
+ }
+
+ private static void createTestDataForPartitionedTable(HoodieTableMetaClient
metaClient,
+ boolean
enableMetadata, HoodieEngineContext context, StorageConfiguration
storageConfiguration,
+ HoodieWriteConfig
writeConfig) throws Exception {
+ final String instantTime = "100";
+ HoodieTestTable testTable = null;
+ if (enableMetadata) {
+ HoodieTableMetadataWriter metadataWriter =
SparkHoodieBackedTableMetadataWriter.create(storageConfiguration, writeConfig,
context);
+ // reload because table configs could have been updated
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter,
Option.of(context));
+ } else {
+ testTable = HoodieTestTable.of(metaClient);
+ }
+ doWriteOperation(testTable, instantTime);
+ }
+
+ private HoodieWriteConfig getWriteConfig(String basePath, boolean
enableMetadata) {
+ return HoodieWriteConfig.newBuilder().withPath(basePath)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build()).build();
+ }
+
+ protected static void doWriteOperation(HoodieTestTable testTable, String
commitTime) throws Exception {
+ doWriteOperation(testTable, commitTime, UPSERT);
+ }
+
+ protected static void doWriteOperation(HoodieTestTable testTable, String
commitTime, WriteOperationType operationType) throws Exception {
+ testTable.withPartitionMetaFiles("p1", "p2", "p3");
+ testTable.doWriteOperation(commitTime, operationType, emptyList(),
asList("p1", "p2", "p3"), 3);
+ }
+}