This is an automated email from the ASF dual-hosted git repository. voonhous pushed a commit to tag rfc-105-pre-cleanup in repository https://gitbox.apache.org/repos/asf/hudi.git
commit a37e4548f3ba58b876d86f4286550ea8da3ee193 Author: voon <[email protected]> AuthorDate: Wed May 27 20:04:09 2026 +0800 feat(trino): apply LIMIT pushdown into HudiTableHandle Mirrors the IcebergMetadata.applyLimit pattern. The connector now accepts a LIMIT hint from the planner and threads it through HudiTableHandle so HudiSplitSource can use it later to short-circuit partition / file-slice listing once row-count estimates from the Hudi metadata table become available. Changes: - HudiTableHandle: add OptionalLong limit to all three constructors plus @JsonProperty getter; introduce withLimit(long) for the apply-flow. applyPredicates propagates the existing limit through. - HudiMetadata.applyLimit: return Optional.empty() when the incoming limit is wider than the stored one (no work), otherwise return LimitApplicationResult with limitGuaranteed=false. Mirrors Iceberg. - Test constructors updated with OptionalLong.empty() for the new param. SUPPORTS_LIMIT_PUSHDOWN stays false in TestHudiConnectorTest because BaseConnectorTest.testLimitPushdown asserts Output -> TableScan with no Limit node, which requires limitGuaranteed=true; a multi-split connector cannot promise that without coordinator-side row-count coordination. Iceberg/Delta Lake/Hive all keep the flag false for the same reason. --- .../java/io/trino/plugin/hudi/HudiMetadata.java | 22 ++++++++++++-- .../java/io/trino/plugin/hudi/HudiTableHandle.java | 35 +++++++++++++++++++++- .../partition/TestHudiPartitionInfoLoader.java | 1 + .../plugin/hudi/split/TestHudiSplitFactory.java | 1 + 4 files changed, 55 insertions(+), 4 deletions(-) diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java index b959e6dac679..18ee1799f15c 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java @@ -38,6 +38,7 @@ import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.ConnectorTableVersion; import io.trino.spi.connector.Constraint; import io.trino.spi.connector.ConstraintApplicationResult; +import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.RelationColumnsMetadata; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; @@ -64,6 +65,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -107,9 +109,6 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ import static org.apache.hudi.common.table.timeline.HoodieTimeline.INDEXING_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; -// TODO: implement applyLimit(...) to push LIMIT into HudiTableHandle so the split loader can -// short-circuit partition/file-slice listing once a row-count estimate covers the limit. -// Also flip SUPPORTS_LIMIT_PUSHDOWN to true in TestHudiConnectorTest once implemented. public class HudiMetadata implements ConnectorMetadata { @@ -181,6 +180,7 @@ public class HudiMetadata ImmutableSet.of(), TupleDomain.all(), TupleDomain.all(), + OptionalLong.empty(), hudiTableSchema); } @@ -264,6 +264,22 @@ public class HudiMetadata false)); } + @Override + public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(ConnectorSession session, ConnectorTableHandle handle, long limit) + { + HudiTableHandle table = (HudiTableHandle) handle; + + if (table.getLimit().isPresent() && table.getLimit().getAsLong() <= limit) { + return Optional.empty(); + } + + // limitGuaranteed=false: the connector can't bound row count across splits without + // coordinator-side coordination. Trino keeps the Limit operator above the TableScan. + // The stored limit is consumed by HudiSplitSource for split-listing short-circuit when + // row-count estimates from the Hudi metadata table become available (TODO). + return Optional.of(new LimitApplicationResult<>(table.withLimit(limit), false, false)); + } + @Override public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) { diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java index 1b9efd1c6e18..cfe4801b2f38 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java @@ -33,6 +33,7 @@ import org.apache.hudi.util.Lazy; import java.util.List; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.function.Supplier; @@ -54,6 +55,7 @@ public class HudiTableHandle private final Set<HiveColumnHandle> constraintColumns; private final TupleDomain<HiveColumnHandle> partitionPredicates; private final TupleDomain<HiveColumnHandle> regularPredicates; + private final OptionalLong limit; private final Optional<Lazy<HoodieSchema>> hudiTableSchema; // Coordinator-only private final transient Optional<Table> table; @@ -70,11 +72,12 @@ public class HudiTableHandle @JsonProperty("orderingColumns") List<HiveColumnHandle> orderingColumns, @JsonProperty("partitionPredicates") TupleDomain<HiveColumnHandle> partitionPredicates, @JsonProperty("regularPredicates") TupleDomain<HiveColumnHandle> regularPredicates, + @JsonProperty("limit") OptionalLong limit, @JsonProperty("tableSchemaStr") String tableSchemaStr, @JsonProperty("latestCommitTime") String latestCommitTime) { this(Optional.empty(), Optional.empty(), schemaName, tableName, basePath, tableType, partitionColumns, Lazy.lazily(() -> orderingColumns), ImmutableSet.of(), - partitionPredicates, regularPredicates, buildTableSchema(tableSchemaStr), () -> latestCommitTime); + partitionPredicates, regularPredicates, limit, buildTableSchema(tableSchemaStr), () -> latestCommitTime); } public HudiTableHandle( @@ -89,6 +92,7 @@ public class HudiTableHandle Set<HiveColumnHandle> constraintColumns, TupleDomain<HiveColumnHandle> partitionPredicates, TupleDomain<HiveColumnHandle> regularPredicates, + OptionalLong limit, Optional<Lazy<HoodieSchema>> hudiTableSchema) { this( @@ -103,6 +107,7 @@ public class HudiTableHandle constraintColumns, partitionPredicates, regularPredicates, + limit, hudiTableSchema, () -> lazyMetaClient .get() @@ -128,6 +133,7 @@ public class HudiTableHandle Set<HiveColumnHandle> constraintColumns, TupleDomain<HiveColumnHandle> partitionPredicates, TupleDomain<HiveColumnHandle> regularPredicates, + OptionalLong limit, Optional<Lazy<HoodieSchema>> hudiTableSchema, Supplier<String> latestCommitTimeSupplier) { @@ -142,6 +148,7 @@ public class HudiTableHandle this.constraintColumns = requireNonNull(constraintColumns, "constraintColumns is null"); this.partitionPredicates = requireNonNull(partitionPredicates, "partitionPredicates is null"); this.regularPredicates = requireNonNull(regularPredicates, "regularPredicates is null"); + this.limit = requireNonNull(limit, "limit is null"); this.hudiTableSchema = requireNonNull(hudiTableSchema, "hudiTableSchema is null"); this.lazyLatestCommitTime = Lazy.lazily(latestCommitTimeSupplier); } @@ -254,6 +261,12 @@ public class HudiTableHandle return regularPredicates; } + @JsonProperty + public OptionalLong getLimit() + { + return limit; + } + @JsonProperty public List<HiveColumnHandle> getOrderingColumns() { @@ -282,6 +295,26 @@ public class HudiTableHandle constraintColumns, partitionPredicates.intersect(partitionTupleDomain), regularPredicates.intersect(regularTupleDomain), + limit, + hudiTableSchema, + this::getLatestCommitTime); + } + + HudiTableHandle withLimit(long newLimit) + { + return new HudiTableHandle( + table, + lazyMetaClient, + schemaName, + tableName, + basePath, + tableType, + partitionColumns, + lazyOrderingColumns, + constraintColumns, + partitionPredicates, + regularPredicates, + OptionalLong.of(newLimit), hudiTableSchema, this::getLatestCommitTime); } diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/partition/TestHudiPartitionInfoLoader.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/partition/TestHudiPartitionInfoLoader.java index 765777cf855d..097d0285be7b 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/partition/TestHudiPartitionInfoLoader.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/partition/TestHudiPartitionInfoLoader.java @@ -163,6 +163,7 @@ public class TestHudiPartitionInfoLoader ImmutableList.of(), TupleDomain.all(), TupleDomain.all(), + java.util.OptionalLong.empty(), "", "101"); HudiSplitWeightProvider weightProvider = new SizeBasedSplitWeightProvider(0.05, DataSize.of(128, MEGABYTE)); diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/split/TestHudiSplitFactory.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/split/TestHudiSplitFactory.java index 01a4208e9758..57937af6b0b3 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/split/TestHudiSplitFactory.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/split/TestHudiSplitFactory.java @@ -182,6 +182,7 @@ public class TestHudiSplitFactory ImmutableList.of(), TupleDomain.all(), TupleDomain.all(), + java.util.OptionalLong.empty(), "", "101"); }
