This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 8d0e813967a29077cca52fca74e468db0cb2bc24
Author: Tim Brown <[email protected]>
AuthorDate: Thu Aug 24 10:58:19 2023 -0500

    [HUDI-6741] Timeline server bug when multiple tables registered with 
metadata table enabled (#9511)
---
 .../client/embedded/EmbeddedTimelineService.java   |  2 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |  4 +-
 .../TestRemoteFileSystemViewWithMetadataTable.java | 63 ++++++++++++++++------
 .../common/table/view/FileSystemViewManager.java   | 27 +++++-----
 4 files changed, 63 insertions(+), 33 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
index c79942524f1..7d794366ba0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
@@ -70,7 +70,7 @@ public class EmbeddedTimelineService {
       // Reset to default if set to Remote
       builder.withStorageType(FileSystemViewStorageType.MEMORY);
     }
-    return FileSystemViewManager.createViewManager(context, 
writeConfig.getMetadataConfig(), builder.build(), 
writeConfig.getCommonConfig(), basePath);
+    return FileSystemViewManager.createViewManagerWithTableMetadata(context, 
writeConfig.getMetadataConfig(), builder.build(), 
writeConfig.getCommonConfig());
   }
 
   public void startServer() throws IOException {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 59fa69de2e6..f1de637edf5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -145,7 +145,7 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
         .build();
     this.metadata = HoodieTableMetadata.create(context, metadataConfig, 
config.getBasePath());
 
-    this.viewManager = FileSystemViewManager.createViewManager(context, 
config.getMetadataConfig(), config.getViewStorageConfig(), 
config.getCommonConfig(), () -> metadata);
+    this.viewManager = FileSystemViewManager.createViewManager(context, 
config.getMetadataConfig(), config.getViewStorageConfig(), 
config.getCommonConfig(), unused -> metadata);
     this.metaClient = metaClient;
     this.index = getIndex(config, context);
     this.storageLayout = getStorageLayout(config);
@@ -164,7 +164,7 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
 
   private synchronized FileSystemViewManager getViewManager() {
     if (null == viewManager) {
-      viewManager = FileSystemViewManager.createViewManager(getContext(), 
config.getMetadataConfig(), config.getViewStorageConfig(), 
config.getCommonConfig(), () -> metadata);
+      viewManager = FileSystemViewManager.createViewManager(getContext(), 
config.getMetadataConfig(), config.getViewStorageConfig(), 
config.getCommonConfig(), unused -> metadata);
     }
     return viewManager;
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java
index a6e304daaa4..adb47cc0694 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java
@@ -36,6 +36,7 @@ import 
org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -57,9 +58,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -83,7 +86,6 @@ public class TestRemoteFileSystemViewWithMetadataTable 
extends HoodieSparkClient
     initPath();
     initSparkContexts();
     initFileSystem();
-    initMetaClient();
     initTimelineService();
     dataGen = new HoodieTestDataGenerator(0x1f86);
   }
@@ -102,7 +104,7 @@ public class TestRemoteFileSystemViewWithMetadataTable 
extends HoodieSparkClient
   @Override
   public void initTimelineService() {
     // Start a timeline server that are running across multiple commits
-    HoodieLocalEngineContext localEngineContext = new 
HoodieLocalEngineContext(metaClient.getHadoopConf());
+    HoodieLocalEngineContext localEngineContext = new 
HoodieLocalEngineContext(hadoopConf);
 
     try {
       HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
@@ -117,8 +119,8 @@ public class TestRemoteFileSystemViewWithMetadataTable 
extends HoodieSparkClient
           FileSystemViewManager.createViewManager(
               context, config.getMetadataConfig(), 
config.getViewStorageConfig(),
               config.getCommonConfig(),
-              () -> new HoodieBackedTestDelayedTableMetadata(
-                  context, config.getMetadataConfig(), basePath, true)));
+              metaClient -> new HoodieBackedTestDelayedTableMetadata(
+                  context, config.getMetadataConfig(), 
metaClient.getBasePathV2().toString(), true)));
       timelineService.startService();
       timelineServicePort = timelineService.getServerPort();
       LOG.info("Started timeline server on port: " + timelineServicePort);
@@ -133,23 +135,39 @@ public class TestRemoteFileSystemViewWithMetadataTable 
extends HoodieSparkClient
     // This test utilizes the `HoodieBackedTestDelayedTableMetadata` to make 
sure the
     // synced file system view is always served.
 
-    SparkRDDWriteClient writeClient = createWriteClient(
+    // Create two tables to guarantee the timeline server can properly handle 
multiple base paths with metadata table enabled
+    String basePathStr1 = initializeTable("dataset1");
+    String basePathStr2 = initializeTable("dataset2");
+    try (SparkRDDWriteClient writeClient1 = createWriteClient(basePathStr1, 
"test_mor_table1",
         useExistingTimelineServer ? Option.of(timelineService) : 
Option.empty());
+         SparkRDDWriteClient writeClient2 = createWriteClient(basePathStr2, 
"test_mor_table2",
+             useExistingTimelineServer ? Option.of(timelineService) : 
Option.empty())) {
+      for (int i = 0; i < 3; i++) {
+        writeToTable(i, writeClient1);
+      }
+
+
+      for (int i = 0; i < 3; i++) {
+        writeToTable(i, writeClient2);
+      }
 
-    for (int i = 0; i < 3; i++) {
-      writeToTable(i, writeClient);
+      runAssertionsForBasePath(useExistingTimelineServer, basePathStr1, 
writeClient1);
+      runAssertionsForBasePath(useExistingTimelineServer, basePathStr2, 
writeClient2);
     }
+  }
 
+  private void runAssertionsForBasePath(boolean useExistingTimelineServer, 
String basePathStr, SparkRDDWriteClient writeClient) throws IOException {
     // At this point, there are three deltacommits and one compaction commit 
in the Hudi timeline,
     // and the file system view of timeline server is not yet synced
     HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.builder()
-        .setConf(metaClient.getHadoopConf())
-        .setBasePath(basePath)
+        .setConf(hadoopConf)
+        .setBasePath(basePathStr)
         .build();
     HoodieActiveTimeline timeline = newMetaClient.getActiveTimeline();
     HoodieInstant compactionCommit = timeline.lastInstant().get();
     assertTrue(timeline.lastInstant().get().getAction().equals(COMMIT_ACTION));
 
+
     // For all the file groups compacted by the compaction commit, the file 
system view
     // should return the latest file slices which is written by the latest 
commit
     HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
@@ -175,10 +193,10 @@ public class TestRemoteFileSystemViewWithMetadataTable 
extends HoodieSparkClient
 
     LOG.info("Connecting to Timeline Server: " + timelineServerPort);
     RemoteHoodieTableFileSystemView view =
-        new RemoteHoodieTableFileSystemView("localhost", timelineServerPort, 
metaClient);
+        new RemoteHoodieTableFileSystemView("localhost", timelineServerPort, 
newMetaClient);
 
     List<TestViewLookUpCallable> callableList = lookupList.stream()
-        .map(pair -> new TestViewLookUpCallable(view, pair, 
compactionCommit.getTimestamp()))
+        .map(pair -> new TestViewLookUpCallable(view, pair, 
compactionCommit.getTimestamp(), basePathStr))
         .collect(Collectors.toList());
     List<Future<Boolean>> resultList = new ArrayList<>();
 
@@ -195,6 +213,15 @@ public class TestRemoteFileSystemViewWithMetadataTable 
extends HoodieSparkClient
         return false;
       }
     }).reduce((a, b) -> a && b).get());
+    pool.shutdown();
+  }
+
+  private String initializeTable(String dataset) throws IOException {
+    java.nio.file.Path basePath = tempDir.resolve(dataset);
+    Files.createDirectories(basePath);
+    String basePathStr = basePath.toAbsolutePath().toString();
+    HoodieTestUtils.init(hadoopConf, basePathStr, 
HoodieTableType.MERGE_ON_READ, new Properties());
+    return basePathStr;
   }
 
   @Override
@@ -202,7 +229,7 @@ public class TestRemoteFileSystemViewWithMetadataTable 
extends HoodieSparkClient
     return HoodieTableType.MERGE_ON_READ;
   }
 
-  private SparkRDDWriteClient createWriteClient(Option<TimelineService> 
timelineService) {
+  private SparkRDDWriteClient createWriteClient(String basePath, String 
tableName, Option<TimelineService> timelineService) {
     HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
         .withPath(basePath)
         .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
@@ -221,7 +248,7 @@ public class TestRemoteFileSystemViewWithMetadataTable 
extends HoodieSparkClient
                 ? timelineService.get().getServerPort() : 
REMOTE_PORT_NUM.defaultValue())
             .build())
         .withAutoCommit(false)
-        .forTable("test_mor_table")
+        .forTable(tableName)
         .build();
     return new SparkRDDWriteClient(context, writeConfig, timelineService);
   }
@@ -248,22 +275,26 @@ public class TestRemoteFileSystemViewWithMetadataTable 
extends HoodieSparkClient
     private final RemoteHoodieTableFileSystemView view;
     private final Pair<String, String> partitionFileIdPair;
     private final String expectedCommitTime;
+    private final String expectedBasePath;
 
     public TestViewLookUpCallable(
         RemoteHoodieTableFileSystemView view,
         Pair<String, String> partitionFileIdPair,
-        String expectedCommitTime) {
+        String expectedCommitTime,
+        String expectedBasePath) {
       this.view = view;
       this.partitionFileIdPair = partitionFileIdPair;
       this.expectedCommitTime = expectedCommitTime;
+      this.expectedBasePath = expectedBasePath;
     }
 
     @Override
     public Boolean call() throws Exception {
       Option<FileSlice> latestFileSlice = view.getLatestFileSlice(
           partitionFileIdPair.getLeft(), partitionFileIdPair.getRight());
-      boolean result = latestFileSlice.isPresent() && 
expectedCommitTime.equals(
-          FSUtils.getCommitTime(new 
Path(latestFileSlice.get().getBaseFile().get().getPath()).getName()));
+      String latestBaseFilePath = 
latestFileSlice.get().getBaseFile().get().getPath();
+      boolean result = latestFileSlice.isPresent() && 
latestBaseFilePath.startsWith(expectedBasePath)
+          && expectedCommitTime.equals(FSUtils.getCommitTime(new 
Path(latestBaseFilePath).getName()));
       if (!result) {
         LOG.error("The timeline server does not return the correct result: 
latestFileSliceReturned="
             + latestFileSlice + " expectedCommitTime=" + expectedCommitTime);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
index 345f8e668ae..d729cc94d10 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieMetaserverConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.function.SerializableSupplier;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Functions.Function2;
@@ -161,12 +161,12 @@ public class FileSystemViewManager {
    *
    */
   private static HoodieTableFileSystemView 
createInMemoryFileSystemView(HoodieMetadataConfig metadataConfig, 
FileSystemViewStorageConfig viewConf,
-                                                                        
HoodieTableMetaClient metaClient, SerializableSupplier<HoodieTableMetadata> 
metadataSupplier) {
+                                                                        
HoodieTableMetaClient metaClient, 
SerializableFunctionUnchecked<HoodieTableMetaClient, HoodieTableMetadata> 
metadataCreator) {
     LOG.info("Creating InMemory based view for basePath " + 
metaClient.getBasePathV2());
     HoodieTimeline timeline = 
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
     if (metaClient.getTableConfig().isMetadataTableAvailable()) {
-      ValidationUtils.checkArgument(metadataSupplier != null, "Metadata 
supplier is null. Cannot instantiate metadata file system view");
-      return new HoodieMetadataFileSystemView(metaClient, timeline, 
metadataSupplier.get());
+      ValidationUtils.checkArgument(metadataCreator != null, "Metadata 
supplier is null. Cannot instantiate metadata file system view");
+      return new HoodieMetadataFileSystemView(metaClient, timeline, 
metadataCreator.apply(metaClient));
     }
     if (metaClient.getMetaserverConfig().isMetaserverEnabled()) {
       return (HoodieTableFileSystemView) 
ReflectionUtils.loadClass(HOODIE_METASERVER_FILE_SYSTEM_VIEW_CLASS,
@@ -220,16 +220,15 @@ public class FileSystemViewManager {
                                                         final 
HoodieMetadataConfig metadataConfig,
                                                         final 
FileSystemViewStorageConfig config,
                                                         final 
HoodieCommonConfig commonConfig) {
-    return createViewManager(context, metadataConfig, config, commonConfig, 
(SerializableSupplier<HoodieTableMetadata>) null);
+    return createViewManager(context, metadataConfig, config, commonConfig, 
null);
   }
 
-  public static FileSystemViewManager createViewManager(final 
HoodieEngineContext context,
-                                                        final 
HoodieMetadataConfig metadataConfig,
-                                                        final 
FileSystemViewStorageConfig config,
-                                                        final 
HoodieCommonConfig commonConfig,
-                                                        final String basePath) 
{
+  public static FileSystemViewManager createViewManagerWithTableMetadata(final 
HoodieEngineContext context,
+                                                                         final 
HoodieMetadataConfig metadataConfig,
+                                                                         final 
FileSystemViewStorageConfig config,
+                                                                         final 
HoodieCommonConfig commonConfig) {
     return createViewManager(context, metadataConfig, config, commonConfig,
-        () -> HoodieTableMetadata.create(context, metadataConfig, basePath, 
true));
+        metaClient -> HoodieTableMetadata.create(context, metadataConfig, 
metaClient.getBasePathV2().toString(), true));
   }
 
   /**
@@ -240,7 +239,7 @@ public class FileSystemViewManager {
                                                         final 
HoodieMetadataConfig metadataConfig,
                                                         final 
FileSystemViewStorageConfig config,
                                                         final 
HoodieCommonConfig commonConfig,
-                                                        final 
SerializableSupplier<HoodieTableMetadata> metadataSupplier) {
+                                                        final 
SerializableFunctionUnchecked<HoodieTableMetaClient, HoodieTableMetadata> 
metadataCreator) {
     LOG.info("Creating View Manager with storage type :" + 
config.getStorageType());
     final SerializableConfiguration conf = context.getHadoopConf();
     switch (config.getStorageType()) {
@@ -255,7 +254,7 @@ public class FileSystemViewManager {
       case MEMORY:
         LOG.info("Creating in-memory based Table View");
         return new FileSystemViewManager(context, config,
-            (metaClient, viewConfig) -> 
createInMemoryFileSystemView(metadataConfig, viewConfig, metaClient, 
metadataSupplier));
+            (metaClient, viewConfig) -> 
createInMemoryFileSystemView(metadataConfig, viewConfig, metaClient, 
metadataCreator));
       case REMOTE_ONLY:
         LOG.info("Creating remote only table view");
         return new FileSystemViewManager(context, config, (metaClient, 
viewConfig) -> createRemoteFileSystemView(conf,
@@ -268,7 +267,7 @@ public class FileSystemViewManager {
           SyncableFileSystemView secondaryView;
           switch (viewConfig.getSecondaryStorageType()) {
             case MEMORY:
-              secondaryView = createInMemoryFileSystemView(metadataConfig, 
viewConfig, metaClient, metadataSupplier);
+              secondaryView = createInMemoryFileSystemView(metadataConfig, 
viewConfig, metaClient, metadataCreator);
               break;
             case EMBEDDED_KV_STORE:
               secondaryView = createRocksDBBasedFileSystemView(conf, 
viewConfig, metaClient);

Reply via email to