This is an automated email from the ASF dual-hosted git repository. voonhous pushed a commit to tag rfc-105-pre-cleanup in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 64baf5d2366537f017df2ad534980d93c4a760c9 Author: voon <[email protected]> AuthorDate: Mon May 25 19:12:22 2026 +0800 fix(trino): port hudi-trino to Trino 482 SPI and restore metadata table fallback Bumps trino.connector.version to 482-SNAPSHOT and ports the connector for the SPI drift since 480: - CachingHostAddressProvider was deleted upstream; drop its usage from the split factory / split manager / split source / background loader and pass empty host lists through HudiSplit construction. - Wire ConnectorContextModule (from io.trino.plugin.base) in HudiConnectorFactory instead of binding Node/NodeVersion/NodeManager/ OpenTelemetry/Tracer/TypeManager/CatalogName ad-hoc. - Add io.airlift:configuration-testing as a test dependency for the config-tester used by the Trino-side test scaffolding. Restores the construction-aware metadata-table fallback in HudiSplitSource. The Hudi 1.1.0 bump replaced HoodieTableMetadata.create(...) with an explicit ternary keyed on tableConfig.isMetadataTableAvailable(), losing the 1.0.2 factory's "construct HoodieBackedTableMetadata, check isMetadataTableInitialized(), fall back to FileSystemBackedTableMetadata otherwise" semantic. Open-codes that logic against the Hudi 1.3 public API so tables whose metadata table is configured but not initialized on disk still serve queries. --- hudi-trino-plugin/pom.xml | 6 +++++ .../io/trino/plugin/hudi/HudiConnectorFactory.java | 18 +++---------- .../io/trino/plugin/hudi/HudiSplitManager.java | 9 ++----- .../java/io/trino/plugin/hudi/HudiSplitSource.java | 26 ++++++++++++++----- .../hudi/split/HudiBackgroundSplitLoader.java | 4 +-- .../trino/plugin/hudi/split/HudiSplitFactory.java | 30 +++++++++------------- .../java/io/trino/plugin/hudi/HudiQueryRunner.java | 8 +++--- .../io/trino/plugin/hudi/TestHudiPageSource.java | 2 +- .../io/trino/plugin/hudi/TestHudiSmokeTest.java | 2 +- pom.xml | 2 +- 10 files changed, 50 insertions(+), 57 deletions(-) diff --git a/hudi-trino-plugin/pom.xml b/hudi-trino-plugin/pom.xml index a2cc55fa1fe8..3dda76c1de18 100644 --- a/hudi-trino-plugin/pom.xml +++ b/hudi-trino-plugin/pom.xml @@ -356,6 +356,12 @@ <scope>runtime</scope> </dependency> + <dependency> + <groupId>io.airlift</groupId> + <artifactId>configuration-testing</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>io.airlift</groupId> <artifactId>junit-extensions</artifactId> diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java index 226c7c507b4c..cb5a565691c4 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java @@ -22,18 +22,14 @@ import io.airlift.bootstrap.Bootstrap; import io.airlift.bootstrap.LifeCycleManager; import io.airlift.configuration.AbstractConfigurationAwareModule; import io.airlift.json.JsonModule; -import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.trace.Tracer; import io.trino.filesystem.manager.FileSystemModule; +import io.trino.plugin.base.ConnectorContextModule; import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider; import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager; import io.trino.plugin.base.classloader.ClassLoaderSafeNodePartitioningProvider; import io.trino.plugin.base.jmx.MBeanServerModule; import io.trino.plugin.base.session.SessionPropertiesProvider; import io.trino.plugin.hive.metastore.HiveMetastoreModule; -import io.trino.spi.NodeManager; -import io.trino.spi.NodeVersion; -import io.trino.spi.catalog.CatalogName; import io.trino.spi.classloader.ThreadContextClassLoader; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorContext; @@ -41,7 +37,6 @@ import io.trino.spi.connector.ConnectorFactory; import io.trino.spi.connector.ConnectorNodePartitioningProvider; import io.trino.spi.connector.ConnectorPageSourceProvider; import io.trino.spi.connector.ConnectorSplitManager; -import io.trino.spi.type.TypeManager; import org.weakref.jmx.guice.MBeanModule; import java.util.Map; @@ -83,15 +78,8 @@ public class HudiConnectorFactory new HiveMetastoreModule(Optional.empty(), false), new HudiFileSystemModule(catalogName, context), new MBeanServerModule(), - module.orElse(EMPTY_MODULE), - binder -> { - binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry()); - binder.bind(Tracer.class).toInstance(context.getTracer()); - binder.bind(NodeVersion.class).toInstance(new NodeVersion(context.getNodeManager().getCurrentNode().getVersion())); - binder.bind(NodeManager.class).toInstance(context.getNodeManager()); - binder.bind(TypeManager.class).toInstance(context.getTypeManager()); - binder.bind(CatalogName.class).toInstance(new CatalogName(catalogName)); - }); + new ConnectorContextModule(catalogName, context), + module.orElse(EMPTY_MODULE)); Injector injector = app .doNotInitializeLogging() diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java index 79f03ccd07b2..8f774e3f5f15 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java @@ -17,7 +17,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import io.airlift.log.Logger; -import io.trino.filesystem.cache.CachingHostAddressProvider; import io.trino.metastore.HiveMetastore; import io.trino.metastore.Partition; import io.trino.metastore.StorageFormat; @@ -61,19 +60,16 @@ public class HudiSplitManager private final BiFunction<ConnectorIdentity, HiveTransactionHandle, HiveMetastore> metastoreProvider; private final ExecutorService executor; private final ScheduledExecutorService splitLoaderExecutorService; - private final CachingHostAddressProvider cachingHostAddressProvider; @Inject public HudiSplitManager( BiFunction<ConnectorIdentity, HiveTransactionHandle, HiveMetastore> metastoreProvider, @ForHudiSplitManager ExecutorService executor, - @ForHudiSplitSource ScheduledExecutorService splitLoaderExecutorService, - CachingHostAddressProvider cachingHostAddressProvider) + @ForHudiSplitSource ScheduledExecutorService splitLoaderExecutorService) { this.metastoreProvider = requireNonNull(metastoreProvider, "metastoreProvider is null"); this.executor = requireNonNull(executor, "executor is null"); this.splitLoaderExecutorService = requireNonNull(splitLoaderExecutorService, "splitLoaderExecutorService is null"); - this.cachingHostAddressProvider = requireNonNull(cachingHostAddressProvider, "cachingHostAddressProvider is null"); } @Override @@ -103,8 +99,7 @@ public class HudiSplitManager getMaxOutstandingSplits(session), lazyAllPartitions, dynamicFilter, - getDynamicFilteringWaitTimeout(session), - cachingHostAddressProvider); + getDynamicFilteringWaitTimeout(session)); return new ClassLoaderSafeConnectorSplitSource(splitSource, HudiSplitManager.class.getClassLoader()); } diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java index 6ea49d9baaf8..5e1700776b3f 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java @@ -19,7 +19,6 @@ import com.google.common.util.concurrent.Futures; import io.airlift.log.Logger; import io.airlift.units.DataSize; import io.airlift.units.Duration; -import io.trino.filesystem.cache.CachingHostAddressProvider; import io.trino.metastore.Partition; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HivePartitionKey; @@ -94,8 +93,7 @@ public class HudiSplitSource int maxOutstandingSplits, Lazy<Map<String, Partition>> lazyPartitions, DynamicFilter dynamicFilter, - Duration dynamicFilteringWaitTimeoutMillis, - CachingHostAddressProvider cachingHostAddressProvider) + Duration dynamicFilteringWaitTimeoutMillis) { boolean enableMetadataTable = isHudiMetadataTableEnabled(session); Lazy<HoodieTableMetadata> lazyTableMetadata = Lazy.lazily(() -> { @@ -106,9 +104,24 @@ public class HudiSplitSource HoodieTableMetaClient metaClient = tableHandle.getMetaClient(); HoodieEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getStorage().getConf()); - HoodieTableMetadata tableMetadata = enableMetadataTable && tableHandle.getMetaClient().getTableConfig().isMetadataTableAvailable() ? - new HoodieBackedTableMetadata(engineContext, tableHandle.getMetaClient().getStorage(), metadataConfig, metaClient.getBasePath().toString(), true) : - new FileSystemBackedTableMetadata(engineContext, tableHandle.getMetaClient().getStorage(), metaClient.getBasePath().toString()); + HoodieTableMetadata tableMetadata; + if (enableMetadataTable) { + HoodieBackedTableMetadata mdt = new HoodieBackedTableMetadata( + engineContext, metaClient.getStorage(), metadataConfig, metaClient.getBasePath().toString(), true); + if (mdt.isMetadataTableInitialized()) { + tableMetadata = mdt; + } + else { + log.warn("Metadata table not initialized on disk for %s; falling back to FileSystemBackedTableMetadata", + tableHandle.getSchemaTableName()); + tableMetadata = new FileSystemBackedTableMetadata( + engineContext, metaClient.getStorage(), metaClient.getBasePath().toString()); + } + } + else { + tableMetadata = new FileSystemBackedTableMetadata( + engineContext, metaClient.getStorage(), metaClient.getBasePath().toString()); + } log.info("Loaded table metadata for table: %s in %s ms", tableHandle.getSchemaTableName(), timer.endTimer()); return tableMetadata; }); @@ -130,7 +143,6 @@ public class HudiSplitSource lazyPartitions, enableMetadataTable, lazyTableMetadata, - cachingHostAddressProvider, throwable -> { trinoException.compareAndSet(null, new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Failed to generate splits for " + tableHandle.getSchemaTableName(), throwable)); diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java index 999e36f1ea0d..796413beb94f 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java @@ -19,7 +19,6 @@ import com.google.common.util.concurrent.ListenableFuture; import io.airlift.concurrent.BoundedExecutor; import io.airlift.log.Logger; import io.trino.filesystem.Location; -import io.trino.filesystem.cache.CachingHostAddressProvider; import io.trino.metastore.Column; import io.trino.metastore.Partition; import io.trino.metastore.StorageFormat; @@ -101,14 +100,13 @@ public class HudiBackgroundSplitLoader Lazy<Map<String, Partition>> lazyPartitionMap, boolean enableMetadataTable, Lazy<HoodieTableMetadata> lazyTableMetadata, - CachingHostAddressProvider cachingHostAddressProvider, Consumer<Throwable> errorListener) { this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); this.hudiDirectoryLister = requireNonNull(hudiDirectoryLister, "hudiDirectoryLister is null"); this.asyncQueue = requireNonNull(asyncQueue, "asyncQueue is null"); this.splitGeneratorNumThreads = getSplitGeneratorParallelism(session); - this.hudiSplitFactory = new HudiSplitFactory(tableHandle, hudiSplitWeightProvider, getTargetSplitSize(session), cachingHostAddressProvider); + this.hudiSplitFactory = new HudiSplitFactory(tableHandle, hudiSplitWeightProvider, getTargetSplitSize(session)); this.lazyPartitionMap = requireNonNull(lazyPartitionMap, "partitions is null"); this.enableMetadataTable = enableMetadataTable; this.executor = requireNonNull(executor, "executor is null"); diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java index 09dbf5bd5020..71c3f0c814f6 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java @@ -15,7 +15,6 @@ package io.trino.plugin.hudi.split; import com.google.common.collect.ImmutableList; import io.airlift.units.DataSize; -import io.trino.filesystem.cache.CachingHostAddressProvider; import io.trino.plugin.hive.HivePartitionKey; import io.trino.plugin.hudi.HudiSplit; import io.trino.plugin.hudi.HudiTableHandle; @@ -43,23 +42,20 @@ public class HudiSplitFactory private final HudiTableHandle hudiTableHandle; private final HudiSplitWeightProvider hudiSplitWeightProvider; private final DataSize targetSplitSize; - private final CachingHostAddressProvider cachingHostAddressProvider; public HudiSplitFactory( HudiTableHandle hudiTableHandle, HudiSplitWeightProvider hudiSplitWeightProvider, - DataSize targetSplitSize, - CachingHostAddressProvider cachingHostAddressProvider) + DataSize targetSplitSize) { this.hudiTableHandle = requireNonNull(hudiTableHandle, "hudiTableHandle is null"); this.hudiSplitWeightProvider = requireNonNull(hudiSplitWeightProvider, "hudiSplitWeightProvider is null"); this.targetSplitSize = requireNonNull(targetSplitSize, "targetSplitSize is null"); - this.cachingHostAddressProvider = requireNonNull(cachingHostAddressProvider, "cachingHostAddressProvider is null"); } public List<HudiSplit> createSplits(List<HivePartitionKey> partitionKeys, FileSlice fileSlice, String commitTime) { - return createHudiSplits(hudiTableHandle, partitionKeys, fileSlice, commitTime, hudiSplitWeightProvider, targetSplitSize, cachingHostAddressProvider); + return createHudiSplits(hudiTableHandle, partitionKeys, fileSlice, commitTime, hudiSplitWeightProvider, targetSplitSize); } /** @@ -75,8 +71,7 @@ public class HudiSplitFactory FileSlice fileSlice, String commitTime, HudiSplitWeightProvider hudiSplitWeightProvider, - DataSize targetSplitSize, - CachingHostAddressProvider cachingHostAddressProvider) + DataSize targetSplitSize) { if (fileSlice.isEmpty()) { throw new TrinoException(HUDI_FILESYSTEM_ERROR, format("Not a valid file slice: %s", fileSlice)); @@ -85,9 +80,9 @@ public class HudiSplitFactory if (isCopyOnWriteOrReadOptimized(hudiTableHandle, fileSlice)) { // Handle MERGE_ON_READ tables to be read in read_optimized mode // IMPORTANT: These tables will have a COPY_ON_WRITE table type due to how `HudiTableTypeUtils#fromInputFormat` - return createSplitsForBaseFile(hudiTableHandle, partitionKeys, fileSlice, commitTime, hudiSplitWeightProvider, targetSplitSize, cachingHostAddressProvider); + return createSplitsForBaseFile(hudiTableHandle, partitionKeys, fileSlice, commitTime, hudiSplitWeightProvider, targetSplitSize); } - return createSplitForMergeOnRead(hudiTableHandle, partitionKeys, fileSlice, commitTime, hudiSplitWeightProvider, cachingHostAddressProvider); + return createSplitForMergeOnRead(hudiTableHandle, partitionKeys, fileSlice, commitTime, hudiSplitWeightProvider); } /** @@ -108,15 +103,16 @@ public class HudiSplitFactory FileSlice fileSlice, String commitTime, HudiSplitWeightProvider hudiSplitWeightProvider, - DataSize targetSplitSize, - CachingHostAddressProvider cachingHostAddressProvider) + DataSize targetSplitSize) { checkArgument(fileSlice.getBaseFile().isPresent(), "Hudi base file must exist if there are no log files in the file slice"); HoodieBaseFile baseFile = fileSlice.getBaseFile().get(); long fileSize = baseFile.getFileSize(); - List<HostAddress> addresses = cachingHostAddressProvider.getHosts(baseFile.getPath(), ImmutableList.of()); + // Trino 482 removed CachingHostAddressProvider; object-storage reads have no host + // affinity, so leave the address list empty. Workers are chosen by the scheduler. + List<HostAddress> addresses = ImmutableList.of(); // If the file is empty, create a single split to represent it if (fileSize == 0) { @@ -168,14 +164,12 @@ public class HudiSplitFactory List<HivePartitionKey> partitionKeys, FileSlice fileSlice, String commitTime, - HudiSplitWeightProvider hudiSplitWeightProvider, - CachingHostAddressProvider cachingHostAddressProvider) + HudiSplitWeightProvider hudiSplitWeightProvider) { // NOTE: Some file slices may not have base files Option<HoodieBaseFile> baseFileOption = fileSlice.getBaseFile(); - List<HostAddress> addresses = baseFileOption - .map(baseFile -> cachingHostAddressProvider.getHosts(baseFile.getPath(), ImmutableList.of())) - .orElse(ImmutableList.of()); + // Trino 482 removed CachingHostAddressProvider; no host affinity for object-storage reads. + List<HostAddress> addresses = ImmutableList.of(); HudiSplit split = new HudiSplit( baseFileOption.map(HudiBaseFile::of).orElse(null), diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/HudiQueryRunner.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/HudiQueryRunner.java index 0114eb8400a2..e11efbf3a569 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/HudiQueryRunner.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/HudiQueryRunner.java @@ -35,9 +35,9 @@ import java.util.Map; import java.util.Optional; import static io.trino.testing.TestingSession.testSessionBuilder; -import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY; +import static io.trino.testing.containers.Minio.MINIO_ROOT_USER; import static io.trino.testing.containers.Minio.MINIO_REGION; -import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY; +import static io.trino.testing.containers.Minio.MINIO_ROOT_PASSWORD; import static java.util.Objects.requireNonNull; public final class HudiQueryRunner @@ -60,8 +60,8 @@ public final class HudiQueryRunner { return new Builder("s3://" + hiveMinioDataLake.getBucketName() + "/") .addConnectorProperty("fs.native-s3.enabled", "true") - .addConnectorProperty("s3.aws-access-key", MINIO_ACCESS_KEY) - .addConnectorProperty("s3.aws-secret-key", MINIO_SECRET_KEY) + .addConnectorProperty("s3.aws-access-key", MINIO_ROOT_USER) + .addConnectorProperty("s3.aws-secret-key", MINIO_ROOT_PASSWORD) .addConnectorProperty("s3.region", MINIO_REGION) .addConnectorProperty("s3.endpoint", hiveMinioDataLake.getMinio().getMinioAddress()) .addConnectorProperty("s3.path-style-access", "true"); diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiPageSource.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiPageSource.java index 8bb46f8fb3f5..9a36e7d38524 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiPageSource.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiPageSource.java @@ -16,7 +16,7 @@ package io.trino.plugin.hudi; import io.trino.spi.connector.ConnectorPageSource; import org.junit.jupiter.api.Test; -import static io.trino.spi.testing.InterfaceTestUtils.assertAllMethodsOverridden; +import static io.trino.testing.InterfaceTestUtils.assertAllMethodsOverridden; public class TestHudiPageSource { diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java index dbb7195d38ef..31402d7fb48e 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java @@ -1305,7 +1305,7 @@ public class TestHudiSmokeTest hudiSplit, new LocalInputFile(parquetFile), new FileFormatDataSourceStats(), - new ParquetReaderOptions(), + ParquetReaderOptions.builder().build(), DateTimeZone.UTC, DynamicFilter.EMPTY, true)) { MaterializedResult result = materializeSourceDataStream(session, pageSource, List.of(columnType)).toTestTypes(); assertThat(result.getMaterializedRows()) diff --git a/pom.xml b/pom.xml index b9448e1a940a..9f6d33e1c01f 100644 --- a/pom.xml +++ b/pom.xml @@ -130,7 +130,7 @@ <presto.version>0.273</presto.version> <trino.version>390</trino.version> <!-- Trino SPI version that hudi-trino-plugin main code compiles against. --> - <trino.connector.version>480</trino.connector.version> + <trino.connector.version>482-SNAPSHOT</trino.connector.version> <!-- Trino version used only for *-tests.jar classifier deps. Trino does not publish test-jars for tagged releases, so this tracks a snapshot. Built locally via the Trino source tree; consumers running tests need the snapshot in their m2. -->
