This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 8d0e813967a29077cca52fca74e468db0cb2bc24 Author: Tim Brown <[email protected]> AuthorDate: Thu Aug 24 10:58:19 2023 -0500 [HUDI-6741] Timeline server bug when multiple tables registered with metadata table enabled (#9511) --- .../client/embedded/EmbeddedTimelineService.java | 2 +- .../java/org/apache/hudi/table/HoodieTable.java | 4 +- .../TestRemoteFileSystemViewWithMetadataTable.java | 63 ++++++++++++++++------ .../common/table/view/FileSystemViewManager.java | 27 +++++----- 4 files changed, 63 insertions(+), 33 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index c79942524f1..7d794366ba0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -70,7 +70,7 @@ public class EmbeddedTimelineService { // Reset to default if set to Remote builder.withStorageType(FileSystemViewStorageType.MEMORY); } - return FileSystemViewManager.createViewManager(context, writeConfig.getMetadataConfig(), builder.build(), writeConfig.getCommonConfig(), basePath); + return FileSystemViewManager.createViewManagerWithTableMetadata(context, writeConfig.getMetadataConfig(), builder.build(), writeConfig.getCommonConfig()); } public void startServer() throws IOException { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 59fa69de2e6..f1de637edf5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -145,7 +145,7 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { .build(); this.metadata = HoodieTableMetadata.create(context, metadataConfig, config.getBasePath()); - this.viewManager = FileSystemViewManager.createViewManager(context, config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), () -> metadata); + this.viewManager = FileSystemViewManager.createViewManager(context, config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), unused -> metadata); this.metaClient = metaClient; this.index = getIndex(config, context); this.storageLayout = getStorageLayout(config); @@ -164,7 +164,7 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { private synchronized FileSystemViewManager getViewManager() { if (null == viewManager) { - viewManager = FileSystemViewManager.createViewManager(getContext(), config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), () -> metadata); + viewManager = FileSystemViewManager.createViewManager(getContext(), config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), unused -> metadata); } return viewManager; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java index a6e304daaa4..adb47cc0694 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java @@ -36,6 +36,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; @@ -57,9 +58,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -83,7 +86,6 @@ public class TestRemoteFileSystemViewWithMetadataTable extends HoodieSparkClient initPath(); initSparkContexts(); initFileSystem(); - initMetaClient(); initTimelineService(); dataGen = new HoodieTestDataGenerator(0x1f86); } @@ -102,7 +104,7 @@ public class TestRemoteFileSystemViewWithMetadataTable extends HoodieSparkClient @Override public void initTimelineService() { // Start a timeline server that are running across multiple commits - HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); + HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(hadoopConf); try { HoodieWriteConfig config = HoodieWriteConfig.newBuilder() @@ -117,8 +119,8 @@ public class TestRemoteFileSystemViewWithMetadataTable extends HoodieSparkClient FileSystemViewManager.createViewManager( context, config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), - () -> new HoodieBackedTestDelayedTableMetadata( - context, config.getMetadataConfig(), basePath, true))); + metaClient -> new HoodieBackedTestDelayedTableMetadata( + context, config.getMetadataConfig(), metaClient.getBasePathV2().toString(), true))); timelineService.startService(); timelineServicePort = timelineService.getServerPort(); LOG.info("Started timeline server on port: " + timelineServicePort); @@ -133,23 +135,39 @@ public class TestRemoteFileSystemViewWithMetadataTable extends HoodieSparkClient // This test utilizes the `HoodieBackedTestDelayedTableMetadata` to make sure the // synced file system view is always served. - SparkRDDWriteClient writeClient = createWriteClient( + // Create two tables to guarantee the timeline server can properly handle multiple base paths with metadata table enabled + String basePathStr1 = initializeTable("dataset1"); + String basePathStr2 = initializeTable("dataset2"); + try (SparkRDDWriteClient writeClient1 = createWriteClient(basePathStr1, "test_mor_table1", useExistingTimelineServer ? Option.of(timelineService) : Option.empty()); + SparkRDDWriteClient writeClient2 = createWriteClient(basePathStr2, "test_mor_table2", + useExistingTimelineServer ? Option.of(timelineService) : Option.empty())) { + for (int i = 0; i < 3; i++) { + writeToTable(i, writeClient1); + } + + + for (int i = 0; i < 3; i++) { + writeToTable(i, writeClient2); + } - for (int i = 0; i < 3; i++) { - writeToTable(i, writeClient); + runAssertionsForBasePath(useExistingTimelineServer, basePathStr1, writeClient1); + runAssertionsForBasePath(useExistingTimelineServer, basePathStr2, writeClient2); } + } + private void runAssertionsForBasePath(boolean useExistingTimelineServer, String basePathStr, SparkRDDWriteClient writeClient) throws IOException { // At this point, there are three deltacommits and one compaction commit in the Hudi timeline, // and the file system view of timeline server is not yet synced HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.builder() - .setConf(metaClient.getHadoopConf()) - .setBasePath(basePath) + .setConf(hadoopConf) + .setBasePath(basePathStr) .build(); HoodieActiveTimeline timeline = newMetaClient.getActiveTimeline(); HoodieInstant compactionCommit = timeline.lastInstant().get(); assertTrue(timeline.lastInstant().get().getAction().equals(COMMIT_ACTION)); + // For all the file groups compacted by the compaction commit, the file system view // should return the latest file slices which is written by the latest commit HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( @@ -175,10 +193,10 @@ public class TestRemoteFileSystemViewWithMetadataTable extends HoodieSparkClient LOG.info("Connecting to Timeline Server: " + timelineServerPort); RemoteHoodieTableFileSystemView view = - new RemoteHoodieTableFileSystemView("localhost", timelineServerPort, metaClient); + new RemoteHoodieTableFileSystemView("localhost", timelineServerPort, newMetaClient); List<TestViewLookUpCallable> callableList = lookupList.stream() - .map(pair -> new TestViewLookUpCallable(view, pair, compactionCommit.getTimestamp())) + .map(pair -> new TestViewLookUpCallable(view, pair, compactionCommit.getTimestamp(), basePathStr)) .collect(Collectors.toList()); List<Future<Boolean>> resultList = new ArrayList<>(); @@ -195,6 +213,15 @@ public class TestRemoteFileSystemViewWithMetadataTable extends HoodieSparkClient return false; } }).reduce((a, b) -> a && b).get()); + pool.shutdown(); + } + + private String initializeTable(String dataset) throws IOException { + java.nio.file.Path basePath = tempDir.resolve(dataset); + Files.createDirectories(basePath); + String basePathStr = basePath.toAbsolutePath().toString(); + HoodieTestUtils.init(hadoopConf, basePathStr, HoodieTableType.MERGE_ON_READ, new Properties()); + return basePathStr; } @Override @@ -202,7 +229,7 @@ public class TestRemoteFileSystemViewWithMetadataTable extends HoodieSparkClient return HoodieTableType.MERGE_ON_READ; } - private SparkRDDWriteClient createWriteClient(Option<TimelineService> timelineService) { + private SparkRDDWriteClient createWriteClient(String basePath, String tableName, Option<TimelineService> timelineService) { HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() .withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) @@ -221,7 +248,7 @@ public class TestRemoteFileSystemViewWithMetadataTable extends HoodieSparkClient ? timelineService.get().getServerPort() : REMOTE_PORT_NUM.defaultValue()) .build()) .withAutoCommit(false) - .forTable("test_mor_table") + .forTable(tableName) .build(); return new SparkRDDWriteClient(context, writeConfig, timelineService); } @@ -248,22 +275,26 @@ public class TestRemoteFileSystemViewWithMetadataTable extends HoodieSparkClient private final RemoteHoodieTableFileSystemView view; private final Pair<String, String> partitionFileIdPair; private final String expectedCommitTime; + private final String expectedBasePath; public TestViewLookUpCallable( RemoteHoodieTableFileSystemView view, Pair<String, String> partitionFileIdPair, - String expectedCommitTime) { + String expectedCommitTime, + String expectedBasePath) { this.view = view; this.partitionFileIdPair = partitionFileIdPair; this.expectedCommitTime = expectedCommitTime; + this.expectedBasePath = expectedBasePath; } @Override public Boolean call() throws Exception { Option<FileSlice> latestFileSlice = view.getLatestFileSlice( partitionFileIdPair.getLeft(), partitionFileIdPair.getRight()); - boolean result = latestFileSlice.isPresent() && expectedCommitTime.equals( - FSUtils.getCommitTime(new Path(latestFileSlice.get().getBaseFile().get().getPath()).getName())); + String latestBaseFilePath = latestFileSlice.get().getBaseFile().get().getPath(); + boolean result = latestFileSlice.isPresent() && latestBaseFilePath.startsWith(expectedBasePath) + && expectedCommitTime.equals(FSUtils.getCommitTime(new Path(latestBaseFilePath).getName())); if (!result) { LOG.error("The timeline server does not return the correct result: latestFileSliceReturned=" + latestFileSlice + " expectedCommitTime=" + expectedCommitTime); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index 345f8e668ae..d729cc94d10 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -23,7 +23,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieMetaserverConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.function.SerializableSupplier; +import org.apache.hudi.common.function.SerializableFunctionUnchecked; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Functions.Function2; @@ -161,12 +161,12 @@ public class FileSystemViewManager { * */ private static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieMetadataConfig metadataConfig, FileSystemViewStorageConfig viewConf, - HoodieTableMetaClient metaClient, SerializableSupplier<HoodieTableMetadata> metadataSupplier) { + HoodieTableMetaClient metaClient, SerializableFunctionUnchecked<HoodieTableMetaClient, HoodieTableMetadata> metadataCreator) { LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePathV2()); HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); if (metaClient.getTableConfig().isMetadataTableAvailable()) { - ValidationUtils.checkArgument(metadataSupplier != null, "Metadata supplier is null. Cannot instantiate metadata file system view"); - return new HoodieMetadataFileSystemView(metaClient, timeline, metadataSupplier.get()); + ValidationUtils.checkArgument(metadataCreator != null, "Metadata supplier is null. Cannot instantiate metadata file system view"); + return new HoodieMetadataFileSystemView(metaClient, timeline, metadataCreator.apply(metaClient)); } if (metaClient.getMetaserverConfig().isMetaserverEnabled()) { return (HoodieTableFileSystemView) ReflectionUtils.loadClass(HOODIE_METASERVER_FILE_SYSTEM_VIEW_CLASS, @@ -220,16 +220,15 @@ public class FileSystemViewManager { final HoodieMetadataConfig metadataConfig, final FileSystemViewStorageConfig config, final HoodieCommonConfig commonConfig) { - return createViewManager(context, metadataConfig, config, commonConfig, (SerializableSupplier<HoodieTableMetadata>) null); + return createViewManager(context, metadataConfig, config, commonConfig, null); } - public static FileSystemViewManager createViewManager(final HoodieEngineContext context, - final HoodieMetadataConfig metadataConfig, - final FileSystemViewStorageConfig config, - final HoodieCommonConfig commonConfig, - final String basePath) { + public static FileSystemViewManager createViewManagerWithTableMetadata(final HoodieEngineContext context, + final HoodieMetadataConfig metadataConfig, + final FileSystemViewStorageConfig config, + final HoodieCommonConfig commonConfig) { return createViewManager(context, metadataConfig, config, commonConfig, - () -> HoodieTableMetadata.create(context, metadataConfig, basePath, true)); + metaClient -> HoodieTableMetadata.create(context, metadataConfig, metaClient.getBasePathV2().toString(), true)); } /** @@ -240,7 +239,7 @@ public class FileSystemViewManager { final HoodieMetadataConfig metadataConfig, final FileSystemViewStorageConfig config, final HoodieCommonConfig commonConfig, - final SerializableSupplier<HoodieTableMetadata> metadataSupplier) { + final SerializableFunctionUnchecked<HoodieTableMetaClient, HoodieTableMetadata> metadataCreator) { LOG.info("Creating View Manager with storage type :" + config.getStorageType()); final SerializableConfiguration conf = context.getHadoopConf(); switch (config.getStorageType()) { @@ -255,7 +254,7 @@ public class FileSystemViewManager { case MEMORY: LOG.info("Creating in-memory based Table View"); return new FileSystemViewManager(context, config, - (metaClient, viewConfig) -> createInMemoryFileSystemView(metadataConfig, viewConfig, metaClient, metadataSupplier)); + (metaClient, viewConfig) -> createInMemoryFileSystemView(metadataConfig, viewConfig, metaClient, metadataCreator)); case REMOTE_ONLY: LOG.info("Creating remote only table view"); return new FileSystemViewManager(context, config, (metaClient, viewConfig) -> createRemoteFileSystemView(conf, @@ -268,7 +267,7 @@ public class FileSystemViewManager { SyncableFileSystemView secondaryView; switch (viewConfig.getSecondaryStorageType()) { case MEMORY: - secondaryView = createInMemoryFileSystemView(metadataConfig, viewConfig, metaClient, metadataSupplier); + secondaryView = createInMemoryFileSystemView(metadataConfig, viewConfig, metaClient, metadataCreator); break; case EMBEDDED_KV_STORE: secondaryView = createRocksDBBasedFileSystemView(conf, viewConfig, metaClient);
