This is an automated email from the ASF dual-hosted git repository. voonhous pushed a commit to tag rfc-105-pre-cleanup in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2b67d23daea8d02661d1a70cd20b15dc067ea3d0 Author: voon <[email protected]> AuthorDate: Fri May 22 20:16:21 2026 +0800 fix(trino): port hudi-trino-plugin to Hudi 1.3 HoodieSchema APIs Hudi 1.3 introduces org.apache.hudi.common.schema.HoodieSchema as a wrapper that replaces direct org.apache.avro.Schema use across HoodieReaderContext, HoodieFileReaderFactory, TableSchemaResolver, and HoodieFileGroupReader.Builder. The cherry-picked bump landed the source at Hudi 1.1.0 APIs, so a clean compile against 1.3.0-SNAPSHOT failed in four files plus a kryo classpath issue. - HudiTrinoFileReaderFactory: switch all three newHFileFileReader overrides to Option<HoodieSchema>. - HudiTrinoReaderContext: switch getFileRecordIterator overrides and mergeBootstrapReaders to HoodieSchema; stub mergeBootstrapReaders with UnsupportedOperationException since the Trino connector does not use bootstrap merge. - HudiUtil.getLatestTableSchema: return HoodieSchema directly (drop the wasteful HoodieSchema -> Avro -> HoodieSchema round-trip via toAvroSchema/fromAvroSchema at the page-source-provider boundary). - HudiTableHandle: propagate HoodieSchema through the field, ctors, and buildTableSchema (now HoodieSchema.parse(...)); getTableSchemaStr uses HoodieSchema::toString. - HudiMetadata: align the Lazy<HoodieSchema> wrapper. - HudiPageSourceProvider / IndexSupportFactory: convert to Avro only at the genuine Avro boundaries (constructSchema and getFieldFromSchema). - pom.xml: declare the kryo dep at compile scope explicitly. The Trino BOM manages kryo at test scope, which left HoodieRecord's KryoSerializable superinterface unresolvable at compile time. --- hudi-trino-plugin/pom.xml | 4 +++- .../main/java/io/trino/plugin/hudi/HudiMetadata.java | 4 ++-- .../io/trino/plugin/hudi/HudiPageSourceProvider.java | 9 +++++---- .../java/io/trino/plugin/hudi/HudiTableHandle.java | 18 +++++++++--------- .../src/main/java/io/trino/plugin/hudi/HudiUtil.java | 5 +++-- .../plugin/hudi/io/HudiTrinoFileReaderFactory.java | 8 ++++---- .../plugin/hudi/query/index/IndexSupportFactory.java | 2 +- .../plugin/hudi/reader/HudiTrinoReaderContext.java | 16 +++++++++------- 8 files changed, 36 insertions(+), 30 deletions(-) diff --git a/hudi-trino-plugin/pom.xml b/hudi-trino-plugin/pom.xml index 9670f8a3e43d..a526e593637d 100644 --- a/hudi-trino-plugin/pom.xml +++ b/hudi-trino-plugin/pom.xml @@ -59,10 +59,12 @@ <dependency> <!-- Required for compilation: HoodieRecord implements KryoSerializable, so the compiler needs Kryo on classpath to verify the class hierarchy - when loading HoodieRecord for static imports --> + when loading HoodieRecord for static imports. + Explicit compile scope overrides the Trino BOM, which manages kryo at test scope. --> <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> <version>4.0.2</version> + <scope>compile</scope> </dependency> <dependency> diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java index 2613be54ff09..90bdd2c6e2fd 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java @@ -47,8 +47,8 @@ import io.trino.spi.predicate.TupleDomain; import io.trino.spi.statistics.Estimate; import io.trino.spi.statistics.TableStatistics; import io.trino.spi.type.TypeManager; -import org.apache.avro.Schema; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2; @@ -163,7 +163,7 @@ public class HudiMetadata String inputFormat = table.getStorage().getStorageFormat().getInputFormat(); HoodieTableType hoodieTableType = HudiTableTypeUtils.fromInputFormat(inputFormat); Lazy<HoodieTableMetaClient> lazyMetaClient = Lazy.lazily(() -> buildTableMetaClient(fileSystem, tableName.toString(), basePath)); - Optional<Lazy<Schema>> hudiTableSchema = isResolveColumnNameCasingEnabled(session) ? + Optional<Lazy<HoodieSchema>> hudiTableSchema = isResolveColumnNameCasingEnabled(session) ? Optional.of(Lazy.lazily(() -> getLatestTableSchema(lazyMetaClient.get(), tableName.getTableName()))) : Optional.empty(); return new HudiTableHandle( diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java index 1553b8389a0b..980739af6dcd 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java @@ -53,6 +53,7 @@ import io.trino.spi.predicate.TupleDomain; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.read.HoodieFileGroupReader; import org.apache.hudi.common.util.ValidationUtils; @@ -223,19 +224,19 @@ public class HudiPageSourceProvider dataColumnHandles, hudiMetaAndDataColumnHandles, synthesizedColumnHandler); - Schema dataSchema = + HoodieSchema dataSchema = Optional.ofNullable(hudiTableHandle.getTableSchema()) .orElseGet(() -> getLatestTableSchema(metaClient, hudiTableHandle.getTableName())); - // Construct an Avro schema for log file reader - Schema requestedSchema = constructSchema(dataSchema, hudiMetaAndDataColumnHandles.stream().map(HiveColumnHandle::getName).toList()); + // constructSchema operates on Avro schema to assemble the requested schema for the log file reader. + Schema requestedSchema = constructSchema(dataSchema.toAvroSchema(), hudiMetaAndDataColumnHandles.stream().map(HiveColumnHandle::getName).toList()); HoodieFileGroupReader<IndexedRecord> fileGroupReader = HoodieFileGroupReader.<IndexedRecord>newBuilder() .withReaderContext(readerContext) .withHoodieTableMetaClient(metaClient) .withFileSlice(convertToFileSlice(hudiSplit, hudiTableHandle.getBasePath())) .withDataSchema(dataSchema) - .withRequestedSchema(requestedSchema) + .withRequestedSchema(HoodieSchema.fromAvroSchema(requestedSchema)) .withLatestCommitTime(hudiTableHandle.getLatestCommitTime()) .withProps(metaClient.getTableConfig().getProps()) .withShouldUseRecordPosition(false) diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java index f9f828bcca55..1b9efd1c6e18 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java @@ -24,8 +24,8 @@ import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.TupleDomain; -import org.apache.avro.Schema; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.StringUtils; @@ -54,7 +54,7 @@ public class HudiTableHandle private final Set<HiveColumnHandle> constraintColumns; private final TupleDomain<HiveColumnHandle> partitionPredicates; private final TupleDomain<HiveColumnHandle> regularPredicates; - private final Optional<Lazy<Schema>> hudiTableSchema; + private final Optional<Lazy<HoodieSchema>> hudiTableSchema; // Coordinator-only private final transient Optional<Table> table; private final transient Optional<Lazy<HoodieTableMetaClient>> lazyMetaClient; @@ -89,7 +89,7 @@ public class HudiTableHandle Set<HiveColumnHandle> constraintColumns, TupleDomain<HiveColumnHandle> partitionPredicates, TupleDomain<HiveColumnHandle> regularPredicates, - Optional<Lazy<Schema>> hudiTableSchema) + Optional<Lazy<HoodieSchema>> hudiTableSchema) { this( Optional.of(table), @@ -128,7 +128,7 @@ public class HudiTableHandle Set<HiveColumnHandle> constraintColumns, TupleDomain<HiveColumnHandle> partitionPredicates, TupleDomain<HiveColumnHandle> regularPredicates, - Optional<Lazy<Schema>> hudiTableSchema, + Optional<Lazy<HoodieSchema>> hudiTableSchema, Supplier<String> latestCommitTimeSupplier) { this.table = requireNonNull(table, "table is null"); @@ -147,19 +147,19 @@ public class HudiTableHandle } /** - * Builds a lazily-parsed Avro schema from the given schema string. + * Builds a lazily-parsed schema from the given Avro schema JSON string. * <p> * Returns {@code Optional.empty()} if the input string is null/empty * or if parsing the schema fails. */ - private static Optional<Lazy<Schema>> buildTableSchema(String tableSchemaStr) + private static Optional<Lazy<HoodieSchema>> buildTableSchema(String tableSchemaStr) { if (StringUtils.isNullOrEmpty(tableSchemaStr)) { return Optional.empty(); } try { - Lazy<Schema> lazySchema = Lazy.lazily(() -> new Schema.Parser().parse(tableSchemaStr)); + Lazy<HoodieSchema> lazySchema = Lazy.lazily(() -> HoodieSchema.parse(tableSchemaStr)); return Optional.of(lazySchema); } catch (Exception e) { @@ -231,12 +231,12 @@ public class HudiTableHandle { return hudiTableSchema .map(Lazy::get) - .map(Schema::toString) + .map(HoodieSchema::toString) .orElse(""); } @JsonIgnore - public Schema getTableSchema() + public HoodieSchema getTableSchema() { return hudiTableSchema.map(Lazy::get).orElse(null); } diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java index 52e594bbc4be..1eb84c2fc88e 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java @@ -50,6 +50,7 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; @@ -399,11 +400,11 @@ public final class HudiUtil tableMetadata, metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()); } - public static Schema getLatestTableSchema(HoodieTableMetaClient metaClient, String tableName) + public static HoodieSchema getLatestTableSchema(HoodieTableMetaClient metaClient, String tableName) { try { HoodieTimer timer = HoodieTimer.start(); - Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema(); + HoodieSchema schema = new TableSchemaResolver(metaClient).getTableSchema(); log.info("Fetched table schema for table %s in %s ms", tableName, timer.endTimer()); return schema; } diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/HudiTrinoFileReaderFactory.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/HudiTrinoFileReaderFactory.java index 3571de43d79e..699992cefc8e 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/HudiTrinoFileReaderFactory.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/HudiTrinoFileReaderFactory.java @@ -13,8 +13,8 @@ */ package io.trino.plugin.hudi.io; -import org.apache.avro.Schema; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.util.Option; import org.apache.hudi.io.storage.HFileReaderFactory; import org.apache.hudi.io.storage.HoodieAvroBootstrapFileReader; @@ -44,7 +44,7 @@ public class HudiTrinoFileReaderFactory @Override protected HoodieFileReader newHFileFileReader(HoodieConfig hoodieConfig, StoragePath path, - Option<Schema> schemaOption) + Option<HoodieSchema> schemaOption) throws IOException { HFileReaderFactory readerFactory = HFileReaderFactory.builder() @@ -59,7 +59,7 @@ public class HudiTrinoFileReaderFactory StoragePath path, HoodieStorage storage, byte[] content, - Option<Schema> schemaOption) + Option<HoodieSchema> schemaOption) throws IOException { HFileReaderFactory readerFactory = HFileReaderFactory.builder() @@ -70,7 +70,7 @@ public class HudiTrinoFileReaderFactory } @Override - protected HoodieFileReader newHFileFileReader(HoodieConfig hoodieConfig, StoragePathInfo pathInfo, Option<Schema> schemaOption) + protected HoodieFileReader newHFileFileReader(HoodieConfig hoodieConfig, StoragePathInfo pathInfo, Option<HoodieSchema> schemaOption) throws IOException { HFileReaderFactory readerFactory = HFileReaderFactory.builder() diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/IndexSupportFactory.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/IndexSupportFactory.java index 5d98177896c4..e31f3202a76e 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/IndexSupportFactory.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/IndexSupportFactory.java @@ -131,7 +131,7 @@ public class IndexSupportFactory if (isResolveColumnNameCasingEnabled(session)) { // if column case reconciliation is enabled, transform the tuple domain keys to match the column names from the Hudi table. return tupleDomain.transformKeys(hiveColumnHandle -> - getFieldFromSchema(hiveColumnHandle.getName(), hudiTableHandle.getTableSchema()).name()); + getFieldFromSchema(hiveColumnHandle.getName(), hudiTableHandle.getTableSchema().toAvroSchema()).name()); } return tupleDomain.transformKeys(HiveColumnHandle::getName); } diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoReaderContext.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoReaderContext.java index 1f7584c3c168..cb1bc2e09a83 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoReaderContext.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoReaderContext.java @@ -18,11 +18,11 @@ import io.trino.plugin.hudi.util.HudiAvroSerializer; import io.trino.plugin.hudi.util.SynthesizedColumnHandler; import io.trino.spi.Page; import io.trino.spi.connector.ConnectorPageSource; -import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.avro.AvroRecordContext; import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.table.HoodieTableConfig; @@ -73,8 +73,8 @@ public class HudiTrinoReaderContext StoragePath storagePath, long start, long length, - Schema dataSchema, - Schema requiredSchema, + HoodieSchema dataSchema, + HoodieSchema requiredSchema, HoodieStorage storage) { return createRecordIterator(); @@ -85,8 +85,8 @@ public class HudiTrinoReaderContext StoragePathInfo storagePathInfo, long start, long length, - Schema dataSchema, - Schema requiredSchema, + HoodieSchema dataSchema, + HoodieSchema requiredSchema, HoodieStorage storage) { return createRecordIterator(); @@ -152,8 +152,10 @@ public class HudiTrinoReaderContext } @Override - public ClosableIterator<IndexedRecord> mergeBootstrapReaders(ClosableIterator<IndexedRecord> skeletonFileIterator, Schema skeletonRequiredSchema, ClosableIterator<IndexedRecord> dataFileIterator, Schema dataRequiredSchema, List<Pair<String, Object>> requiredPartitionFieldAndValues) + public ClosableIterator<IndexedRecord> mergeBootstrapReaders(ClosableIterator<IndexedRecord> skeletonFileIterator, HoodieSchema skeletonRequiredSchema, ClosableIterator<IndexedRecord> dataFileIterator, HoodieSchema dataRequiredSchema, List<Pair<String, Object>> requiredPartitionFieldAndValues) { - return null; + // Bootstrap merge is not exercised by the Trino connector; reads of bootstrap tables go + // through the regular page-source path. Throwing surfaces accidental use loudly. + throw new UnsupportedOperationException("HudiTrinoReaderContext does not support bootstrap merge"); } }
