This is an automated email from the ASF dual-hosted git repository.

voonhous pushed a commit to tag rfc-105-pre-cleanup
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit a37e4548f3ba58b876d86f4286550ea8da3ee193
Author: voon <[email protected]>
AuthorDate: Wed May 27 20:04:09 2026 +0800

    feat(trino): apply LIMIT pushdown into HudiTableHandle
    
    Mirrors the IcebergMetadata.applyLimit pattern. The connector now accepts
    a LIMIT hint from the planner and threads it through HudiTableHandle so
    HudiSplitSource can use it later to short-circuit partition / file-slice
    listing once row-count estimates from the Hudi metadata table become
    available.
    
    Changes:
    - HudiTableHandle: add OptionalLong limit to all three constructors plus
      @JsonProperty getter; introduce withLimit(long) for the apply-flow.
      applyPredicates propagates the existing limit through.
    - HudiMetadata.applyLimit: return Optional.empty() when the incoming limit
      is wider than the stored one (no work), otherwise return
      LimitApplicationResult with limitGuaranteed=false. Mirrors Iceberg.
    - Test constructors updated with OptionalLong.empty() for the new param.
    
    SUPPORTS_LIMIT_PUSHDOWN stays false in TestHudiConnectorTest because
    BaseConnectorTest.testLimitPushdown asserts Output -> TableScan with no
    Limit node, which requires limitGuaranteed=true; a multi-split connector
    cannot promise that without coordinator-side row-count coordination.
    Iceberg/Delta Lake/Hive all keep the flag false for the same reason.
---
 .../java/io/trino/plugin/hudi/HudiMetadata.java    | 22 ++++++++++++--
 .../java/io/trino/plugin/hudi/HudiTableHandle.java | 35 +++++++++++++++++++++-
 .../partition/TestHudiPartitionInfoLoader.java     |  1 +
 .../plugin/hudi/split/TestHudiSplitFactory.java    |  1 +
 4 files changed, 55 insertions(+), 4 deletions(-)

diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
index b959e6dac679..18ee1799f15c 100644
--- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
+++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
@@ -38,6 +38,7 @@ import io.trino.spi.connector.ConnectorTableMetadata;
 import io.trino.spi.connector.ConnectorTableVersion;
 import io.trino.spi.connector.Constraint;
 import io.trino.spi.connector.ConstraintApplicationResult;
+import io.trino.spi.connector.LimitApplicationResult;
 import io.trino.spi.connector.RelationColumnsMetadata;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.connector.SchemaTablePrefix;
@@ -64,6 +65,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -107,9 +109,6 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.INDEXING_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
 
-// TODO: implement applyLimit(...) to push LIMIT into HudiTableHandle so the 
split loader can
-//       short-circuit partition/file-slice listing once a row-count estimate 
covers the limit.
-//       Also flip SUPPORTS_LIMIT_PUSHDOWN to true in TestHudiConnectorTest 
once implemented.
 public class HudiMetadata
         implements ConnectorMetadata
 {
@@ -181,6 +180,7 @@ public class HudiMetadata
                 ImmutableSet.of(),
                 TupleDomain.all(),
                 TupleDomain.all(),
+                OptionalLong.empty(),
                 hudiTableSchema);
     }
 
@@ -264,6 +264,22 @@ public class HudiMetadata
                 false));
     }
 
+    @Override
+    public Optional<LimitApplicationResult<ConnectorTableHandle>> 
applyLimit(ConnectorSession session, ConnectorTableHandle handle, long limit)
+    {
+        HudiTableHandle table = (HudiTableHandle) handle;
+
+        if (table.getLimit().isPresent() && table.getLimit().getAsLong() <= 
limit) {
+            return Optional.empty();
+        }
+
+        // limitGuaranteed=false: the connector can't bound row count across 
splits without
+        // coordinator-side coordination. Trino keeps the Limit operator above 
the TableScan.
+        // The stored limit is consumed by HudiSplitSource for split-listing 
short-circuit when
+        // row-count estimates from the Hudi metadata table become available 
(TODO).
+        return Optional.of(new 
LimitApplicationResult<>(table.withLimit(limit), false, false));
+    }
+
     @Override
     public Map<String, ColumnHandle> getColumnHandles(ConnectorSession 
session, ConnectorTableHandle tableHandle)
     {
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java
index 1b9efd1c6e18..cfe4801b2f38 100644
--- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java
+++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java
@@ -33,6 +33,7 @@ import org.apache.hudi.util.Lazy;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.function.Supplier;
 
@@ -54,6 +55,7 @@ public class HudiTableHandle
     private final Set<HiveColumnHandle> constraintColumns;
     private final TupleDomain<HiveColumnHandle> partitionPredicates;
     private final TupleDomain<HiveColumnHandle> regularPredicates;
+    private final OptionalLong limit;
     private final Optional<Lazy<HoodieSchema>> hudiTableSchema;
     // Coordinator-only
     private final transient Optional<Table> table;
@@ -70,11 +72,12 @@ public class HudiTableHandle
             @JsonProperty("orderingColumns") List<HiveColumnHandle> 
orderingColumns,
             @JsonProperty("partitionPredicates") TupleDomain<HiveColumnHandle> 
partitionPredicates,
             @JsonProperty("regularPredicates") TupleDomain<HiveColumnHandle> 
regularPredicates,
+            @JsonProperty("limit") OptionalLong limit,
             @JsonProperty("tableSchemaStr") String tableSchemaStr,
             @JsonProperty("latestCommitTime") String latestCommitTime)
     {
         this(Optional.empty(), Optional.empty(), schemaName, tableName, 
basePath, tableType, partitionColumns, Lazy.lazily(() -> orderingColumns), 
ImmutableSet.of(),
-                partitionPredicates, regularPredicates, 
buildTableSchema(tableSchemaStr), () -> latestCommitTime);
+                partitionPredicates, regularPredicates, limit, 
buildTableSchema(tableSchemaStr), () -> latestCommitTime);
     }
 
     public HudiTableHandle(
@@ -89,6 +92,7 @@ public class HudiTableHandle
             Set<HiveColumnHandle> constraintColumns,
             TupleDomain<HiveColumnHandle> partitionPredicates,
             TupleDomain<HiveColumnHandle> regularPredicates,
+            OptionalLong limit,
             Optional<Lazy<HoodieSchema>> hudiTableSchema)
     {
         this(
@@ -103,6 +107,7 @@ public class HudiTableHandle
                 constraintColumns,
                 partitionPredicates,
                 regularPredicates,
+                limit,
                 hudiTableSchema,
                 () -> lazyMetaClient
                         .get()
@@ -128,6 +133,7 @@ public class HudiTableHandle
             Set<HiveColumnHandle> constraintColumns,
             TupleDomain<HiveColumnHandle> partitionPredicates,
             TupleDomain<HiveColumnHandle> regularPredicates,
+            OptionalLong limit,
             Optional<Lazy<HoodieSchema>> hudiTableSchema,
             Supplier<String> latestCommitTimeSupplier)
     {
@@ -142,6 +148,7 @@ public class HudiTableHandle
         this.constraintColumns = requireNonNull(constraintColumns, 
"constraintColumns is null");
         this.partitionPredicates = requireNonNull(partitionPredicates, 
"partitionPredicates is null");
         this.regularPredicates = requireNonNull(regularPredicates, 
"regularPredicates is null");
+        this.limit = requireNonNull(limit, "limit is null");
         this.hudiTableSchema = requireNonNull(hudiTableSchema, 
"hudiTableSchema is null");
         this.lazyLatestCommitTime = Lazy.lazily(latestCommitTimeSupplier);
     }
@@ -254,6 +261,12 @@ public class HudiTableHandle
         return regularPredicates;
     }
 
+    @JsonProperty
+    public OptionalLong getLimit()
+    {
+        return limit;
+    }
+
     @JsonProperty
     public List<HiveColumnHandle> getOrderingColumns()
     {
@@ -282,6 +295,26 @@ public class HudiTableHandle
                 constraintColumns,
                 partitionPredicates.intersect(partitionTupleDomain),
                 regularPredicates.intersect(regularTupleDomain),
+                limit,
+                hudiTableSchema,
+                this::getLatestCommitTime);
+    }
+
+    HudiTableHandle withLimit(long newLimit)
+    {
+        return new HudiTableHandle(
+                table,
+                lazyMetaClient,
+                schemaName,
+                tableName,
+                basePath,
+                tableType,
+                partitionColumns,
+                lazyOrderingColumns,
+                constraintColumns,
+                partitionPredicates,
+                regularPredicates,
+                OptionalLong.of(newLimit),
                 hudiTableSchema,
                 this::getLatestCommitTime);
     }
diff --git 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/partition/TestHudiPartitionInfoLoader.java
 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/partition/TestHudiPartitionInfoLoader.java
index 765777cf855d..097d0285be7b 100644
--- 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/partition/TestHudiPartitionInfoLoader.java
+++ 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/partition/TestHudiPartitionInfoLoader.java
@@ -163,6 +163,7 @@ public class TestHudiPartitionInfoLoader
                 ImmutableList.of(),
                 TupleDomain.all(),
                 TupleDomain.all(),
+                java.util.OptionalLong.empty(),
                 "",
                 "101");
         HudiSplitWeightProvider weightProvider = new 
SizeBasedSplitWeightProvider(0.05, DataSize.of(128, MEGABYTE));
diff --git 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/split/TestHudiSplitFactory.java
 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/split/TestHudiSplitFactory.java
index 01a4208e9758..57937af6b0b3 100644
--- 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/split/TestHudiSplitFactory.java
+++ 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/split/TestHudiSplitFactory.java
@@ -182,6 +182,7 @@ public class TestHudiSplitFactory
                 ImmutableList.of(),
                 TupleDomain.all(),
                 TupleDomain.all(),
+                java.util.OptionalLong.empty(),
                 "",
                 "101");
     }

Reply via email to