This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6d35691b6a8ccc46baff3ce92a021c0db8ad0934
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri May 10 09:57:02 2024 -0700

    [HUDI-7654] Optimizing BQ sync for MDT (#11061)
    
    * Optimizing BQ sync for MDT
    
    * Adding tests
---
 .../hudi/sync/common/util/ManifestFileWriter.java  |  51 ++++++---
 .../utilities/TestManifestFileWriterSpark.java     | 117 +++++++++++++++++++++
 2 files changed, 151 insertions(+), 17 deletions(-)

diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
index ea6fa8dc5f9..20addf80d56 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
@@ -19,13 +19,17 @@
 package org.apache.hudi.sync.common.util;
 
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.hadoop.fs.Path;
@@ -83,25 +87,14 @@ public class ManifestFileWriter {
     }
   }
 
+  @VisibleForTesting
   public static Stream<String> 
fetchLatestBaseFilesForAllPartitions(HoodieTableMetaClient metaClient,
       boolean useFileListingFromMetadata, boolean assumeDatePartitioning, 
boolean useAbsolutePath) {
     try {
-      HoodieLocalEngineContext engContext = new 
HoodieLocalEngineContext(metaClient.getStorageConf());
-      HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(engContext, metaClient,
-          
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
-          
HoodieMetadataConfig.newBuilder().enable(useFileListingFromMetadata).withAssumeDatePartitioning(assumeDatePartitioning).build());
-      Stream<HoodieBaseFile> allLatestBaseFiles;
-      if (useFileListingFromMetadata) {
-        LOG.info("Fetching all base files from MDT.");
-        fsView.loadAllPartitions();
-        allLatestBaseFiles = fsView.getLatestBaseFiles();
-      } else {
-        List<String> partitions = FSUtils.getAllPartitionPaths(new 
HoodieLocalEngineContext(metaClient.getStorageConf()),
-            metaClient.getBasePathV2().toString(), false, 
assumeDatePartitioning);
-        LOG.info("Retrieve all partitions from fs: {}", partitions.size());
-        allLatestBaseFiles =  
partitions.parallelStream().flatMap(fsView::getLatestBaseFiles);
-      }
-      return allLatestBaseFiles.map(useAbsolutePath ? HoodieBaseFile::getPath 
: HoodieBaseFile::getFileName);
+      StorageConfiguration storageConf = metaClient.getStorageConf();
+      HoodieLocalEngineContext engContext = new 
HoodieLocalEngineContext(storageConf);
+      boolean canUseMetadataTable = useFileListingFromMetadata && 
metaClient.getTableConfig().isMetadataTableAvailable();
+      return getLatestBaseFiles(canUseMetadataTable, engContext, metaClient, 
useAbsolutePath);
     } catch (Exception e) {
       throw new HoodieException("Error in fetching latest base files.", e);
     }
@@ -111,6 +104,30 @@ public class ManifestFileWriter {
     return new StoragePath(metaClient.getMetaPath(), useAbsolutePath ? 
ABSOLUTE_PATH_MANIFEST_FOLDER_NAME : MANIFEST_FOLDER_NAME);
   }
 
+  @VisibleForTesting
+  static Stream<String> getLatestBaseFiles(boolean canUseMetadataTable, 
HoodieEngineContext engContext, HoodieTableMetaClient metaClient,
+                                           boolean useAbsolutePath) {
+    List<String> partitions = FSUtils.getAllPartitionPaths(engContext, 
metaClient.getBasePath(), canUseMetadataTable, false);
+    LOG.info("Retrieve all partitions: " + partitions.size());
+    HoodieTableFileSystemView fsView = null;
+    try {
+      fsView = 
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engContext, 
metaClient,
+          
HoodieMetadataConfig.newBuilder().enable(canUseMetadataTable).build(),
+          
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
+      if (canUseMetadataTable) {
+        // incase of MDT, we can load all partitions at once. If not for MDT, 
we can rely on fsView.getLatestBaseFiles(partition) for each partition to load 
from FS.
+        fsView.loadAllPartitions();
+      }
+      HoodieTableFileSystemView finalFsView = fsView;
+      // if we do not collect and return stream directly, lazy evaluation 
happens and we end up closing the fsview in finally block which later
+      // fails the getLatestBaseFiles call. Hence we collect and return a 
stream.
+      return partitions.parallelStream().flatMap(partition -> 
finalFsView.getLatestBaseFiles(partition)
+          .map(useAbsolutePath ? HoodieBaseFile::getPath : 
HoodieBaseFile::getFileName)).collect(Collectors.toList()).stream();
+    } finally {
+      fsView.close();
+    }
+  }
+
   public StoragePath getManifestFilePath(boolean useAbsolutePath) {
     return new StoragePath(getManifestFolder(useAbsolutePath), 
MANIFEST_FILE_NAME);
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestManifestFileWriterSpark.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestManifestFileWriterSpark.java
new file mode 100644
index 00000000000..3a750dda54a
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestManifestFileWriterSpark.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.sync.common.util.ManifestFileWriter;
+import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class TestManifestFileWriterSpark extends HoodieSparkClientTestHarness {
+
+  protected HoodieTableType tableType;
+
+  @BeforeEach
+  public void setUp() throws IOException {
+    this.tableType = HoodieTableType.COPY_ON_WRITE;
+    initPath();
+    initSparkContexts("TestHoodieMetadata");
+    initHoodieStorage();
+    initMetaClient(tableType);
+  }
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    cleanupResources();
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testCreateManifestFile(boolean enableMetadata) throws Exception {
+    HoodieWriteConfig writeConfig = getWriteConfig(basePath, enableMetadata);
+
+    // Generate data files for 3 partitions.
+    createTestDataForPartitionedTable(metaClient, enableMetadata, context, 
context.getStorageConf(), writeConfig);
+    ManifestFileWriter manifestFileWriter = 
ManifestFileWriter.builder().setMetaClient(metaClient).build();
+    manifestFileWriter.writeManifestFile(false);
+    StoragePath manifestFilePath = 
manifestFileWriter.getManifestFilePath(false);
+    try (InputStream is = metaClient.getStorage().open(manifestFilePath)) {
+      List<String> expectedLines = FileIOUtils.readAsUTFStringLines(is);
+      assertEquals(9, expectedLines.size(), "there should be 9 base files in 
total; 3 per partition.");
+      expectedLines.forEach(line -> assertFalse(line.contains(basePath)));
+    }
+  }
+
+  private static void createTestDataForPartitionedTable(HoodieTableMetaClient 
metaClient,
+                                                        boolean 
enableMetadata, HoodieEngineContext context, StorageConfiguration 
storageConfiguration,
+                                                        HoodieWriteConfig 
writeConfig) throws Exception {
+    final String instantTime = "100";
+    HoodieTestTable testTable = null;
+    if (enableMetadata) {
+      HoodieTableMetadataWriter metadataWriter = 
SparkHoodieBackedTableMetadataWriter.create(storageConfiguration, writeConfig, 
context);
+      // reload because table configs could have been updated
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, 
Option.of(context));
+    } else {
+      testTable = HoodieTestTable.of(metaClient);
+    }
+    doWriteOperation(testTable, instantTime);
+  }
+
+  private HoodieWriteConfig getWriteConfig(String basePath, boolean 
enableMetadata) {
+    return HoodieWriteConfig.newBuilder().withPath(basePath)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build()).build();
+  }
+
+  protected static void doWriteOperation(HoodieTestTable testTable, String 
commitTime) throws Exception {
+    doWriteOperation(testTable, commitTime, UPSERT);
+  }
+
+  protected static void doWriteOperation(HoodieTestTable testTable, String 
commitTime, WriteOperationType operationType) throws Exception {
+    testTable.withPartitionMetaFiles("p1", "p2", "p3");
+    testTable.doWriteOperation(commitTime, operationType, emptyList(), 
asList("p1", "p2", "p3"), 3);
+  }
+}

Reply via email to