This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git


The following commit(s) were added to refs/heads/main by this push:
     new 5697da8  Support incremental snapshots reading (#97)
5697da8 is described below

commit 5697da8de19a60e52842fd86d612c99e35aa0877
Author: rfyu <[email protected]>
AuthorDate: Mon Jun 16 16:47:17 2025 +0800

    Support incremental snapshots reading (#97)
---
 pom.xml                                            |   2 +-
 .../org/apache/paimon/trino/TrinoConnector.java    |  25 ++-
 .../apache/paimon/trino/TrinoConnectorFactory.java |  11 +-
 .../java/org/apache/paimon/trino/TrinoModule.java  |  13 ++
 .../org/apache/paimon/trino/TrinoSplitManager.java |  19 +-
 .../org/apache/paimon/trino/TrinoTableHandle.java  |   8 +-
 .../apache/paimon/trino/TrinoTableOptionUtils.java |   2 +-
 .../apache/paimon/trino/catalog/TrinoCatalog.java  |  26 +--
 .../trino/functions/TrinoFunctionProvider.java     |  50 +++++
 .../tablechanges/TableChangesFunction.java         | 231 +++++++++++++++++++++
 .../TableChangesFunctionProcessor.java             |  69 ++++++
 .../TableChangesFunctionProcessorProvider.java     |  55 +++++
 .../tablechanges/TableChangesFunctionProvider.java |  45 ++++
 .../java/org/apache/paimon/trino/TrinoITCase.java  |  28 +++
 14 files changed, 561 insertions(+), 23 deletions(-)

diff --git a/pom.xml b/pom.xml
index 39ed2db..e57fda0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@ under the License.
     </scm>
 
     <properties>
-        <paimon.version>1.0-SNAPSHOT</paimon.version>
+        <paimon.version>1.0.0</paimon.version>
         <target.java.version>21</target.java.version>
         <jdk.test.version>21</jdk.test.version>
         <trino.version>440</trino.version>
diff --git a/src/main/java/org/apache/paimon/trino/TrinoConnector.java 
b/src/main/java/org/apache/paimon/trino/TrinoConnector.java
index dff66e5..36824cf 100644
--- a/src/main/java/org/apache/paimon/trino/TrinoConnector.java
+++ b/src/main/java/org/apache/paimon/trino/TrinoConnector.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.trino;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;
+
 import io.trino.spi.connector.Connector;
 import io.trino.spi.connector.ConnectorMetadata;
 import io.trino.spi.connector.ConnectorNodePartitioningProvider;
@@ -26,10 +28,14 @@ import io.trino.spi.connector.ConnectorPageSourceProvider;
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorSplitManager;
 import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.function.FunctionProvider;
+import io.trino.spi.function.table.ConnectorTableFunction;
 import io.trino.spi.session.PropertyMetadata;
 import io.trino.spi.transaction.IsolationLevel;
 
 import java.util.List;
+import java.util.Optional;
+import java.util.Set;
 
 import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED;
 import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
@@ -44,6 +50,8 @@ public class TrinoConnector implements Connector {
     private final ConnectorNodePartitioningProvider 
trinoNodePartitioningProvider;
     private final List<PropertyMetadata<?>> tableProperties;
     private final List<PropertyMetadata<?>> sessionProperties;
+    private final Set<ConnectorTableFunction> tableFunctions;
+    private final FunctionProvider functionProvider;
 
     public TrinoConnector(
             ConnectorMetadata trinoMetadata,
@@ -52,7 +60,9 @@ public class TrinoConnector implements Connector {
             ConnectorPageSinkProvider trinoPageSinkProvider,
             ConnectorNodePartitioningProvider trinoNodePartitioningProvider,
             TrinoTableOptions trinoTableOptions,
-            TrinoSessionProperties trinoSessionProperties) {
+            TrinoSessionProperties trinoSessionProperties,
+            Set<ConnectorTableFunction> tableFunctions,
+            FunctionProvider functionProvider) {
         this.trinoMetadata = requireNonNull(trinoMetadata, "trinoMetadata is 
null");
         this.trinoSplitManager = requireNonNull(trinoSplitManager, 
"trinoSplitManager is null");
         this.trinoPageSourceProvider =
@@ -64,6 +74,9 @@ public class TrinoConnector implements Connector {
                         trinoNodePartitioningProvider, 
"trinoNodePartitioningProvider is null");
         this.tableProperties = trinoTableOptions.getTableProperties();
         this.sessionProperties = trinoSessionProperties.getSessionProperties();
+        this.tableFunctions =
+                ImmutableSet.copyOf(requireNonNull(tableFunctions, 
"tableFunctions is null"));
+        this.functionProvider = requireNonNull(functionProvider, 
"functionProvider is null");
     }
 
     @Override
@@ -108,4 +121,14 @@ public class TrinoConnector implements Connector {
     public ConnectorNodePartitioningProvider getNodePartitioningProvider() {
         return trinoNodePartitioningProvider;
     }
+
+    @Override
+    public Set<ConnectorTableFunction> getTableFunctions() {
+        return tableFunctions;
+    }
+
+    @Override
+    public Optional<FunctionProvider> getFunctionProvider() {
+        return Optional.of(functionProvider);
+    }
 }
diff --git a/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java 
b/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
index 4327699..2f244b1 100644
--- a/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
+++ b/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
@@ -22,6 +22,7 @@ import org.apache.paimon.utils.StringUtils;
 
 import com.google.inject.Binder;
 import com.google.inject.Injector;
+import com.google.inject.Key;
 import com.google.inject.Module;
 import io.airlift.bootstrap.Bootstrap;
 import io.airlift.json.JsonModule;
@@ -40,6 +41,8 @@ import io.trino.spi.classloader.ThreadContextClassLoader;
 import io.trino.spi.connector.Connector;
 import io.trino.spi.connector.ConnectorContext;
 import io.trino.spi.connector.ConnectorFactory;
+import io.trino.spi.function.FunctionProvider;
+import io.trino.spi.function.table.ConnectorTableFunction;
 import io.trino.spi.type.TypeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,6 +57,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 /** Trino {@link ConnectorFactory}. */
 public class TrinoConnectorFactory implements ConnectorFactory {
@@ -137,6 +141,9 @@ public class TrinoConnectorFactory implements 
ConnectorFactory {
             TrinoSessionProperties trinoSessionProperties =
                     injector.getInstance(TrinoSessionProperties.class);
             TrinoTableOptions trinoTableOptions = 
injector.getInstance(TrinoTableOptions.class);
+            Set<ConnectorTableFunction> connectorTableFunctions =
+                    injector.getInstance(new Key<>() {});
+            FunctionProvider functionProvider = 
injector.getInstance(FunctionProvider.class);
 
             return new TrinoConnector(
                     new ClassLoaderSafeConnectorMetadata(trinoMetadata, 
classLoader),
@@ -147,7 +154,9 @@ public class TrinoConnectorFactory implements 
ConnectorFactory {
                             trinoPageSinkProvider, classLoader),
                     trinoNodePartitioningProvider,
                     trinoTableOptions,
-                    trinoSessionProperties);
+                    trinoSessionProperties,
+                    connectorTableFunctions,
+                    functionProvider);
         }
     }
 
diff --git a/src/main/java/org/apache/paimon/trino/TrinoModule.java 
b/src/main/java/org/apache/paimon/trino/TrinoModule.java
index f629736..cbdb74e 100644
--- a/src/main/java/org/apache/paimon/trino/TrinoModule.java
+++ b/src/main/java/org/apache/paimon/trino/TrinoModule.java
@@ -19,13 +19,20 @@
 package org.apache.paimon.trino;
 
 import org.apache.paimon.options.Options;
+import org.apache.paimon.trino.functions.TrinoFunctionProvider;
+import 
org.apache.paimon.trino.functions.tablechanges.TableChangesFunctionProcessorProvider;
+import 
org.apache.paimon.trino.functions.tablechanges.TableChangesFunctionProvider;
 
 import com.google.inject.Binder;
 import com.google.inject.Module;
+import com.google.inject.Scopes;
+import io.trino.spi.function.FunctionProvider;
+import io.trino.spi.function.table.ConnectorTableFunction;
 
 import java.util.Map;
 
 import static com.google.inject.Scopes.SINGLETON;
+import static com.google.inject.multibindings.Multibinder.newSetBinder;
 
 /** Module for binding instance. */
 public class TrinoModule implements Module {
@@ -45,5 +52,11 @@ public class TrinoModule implements Module {
         binder.bind(TrinoNodePartitioningProvider.class).in(SINGLETON);
         binder.bind(TrinoSessionProperties.class).in(SINGLETON);
         binder.bind(TrinoTableOptions.class).in(SINGLETON);
+        newSetBinder(binder, ConnectorTableFunction.class)
+                .addBinding()
+                .toProvider(TableChangesFunctionProvider.class)
+                .in(Scopes.SINGLETON);
+        
binder.bind(FunctionProvider.class).to(TrinoFunctionProvider.class).in(Scopes.SINGLETON);
+        binder.bind(TableChangesFunctionProcessorProvider.class).in(SINGLETON);
     }
 }
diff --git a/src/main/java/org/apache/paimon/trino/TrinoSplitManager.java 
b/src/main/java/org/apache/paimon/trino/TrinoSplitManager.java
index 120b0e9..35b402f 100644
--- a/src/main/java/org/apache/paimon/trino/TrinoSplitManager.java
+++ b/src/main/java/org/apache/paimon/trino/TrinoSplitManager.java
@@ -31,6 +31,7 @@ import io.trino.spi.connector.ConnectorTableHandle;
 import io.trino.spi.connector.ConnectorTransactionHandle;
 import io.trino.spi.connector.Constraint;
 import io.trino.spi.connector.DynamicFilter;
+import io.trino.spi.function.table.ConnectorTableFunctionHandle;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -57,15 +58,25 @@ public class TrinoSplitManager implements 
ConnectorSplitManager {
             ConnectorTableHandle table,
             DynamicFilter dynamicFilter,
             Constraint constraint) {
-        return getSplits(table, session);
+        return getSplits((TrinoTableHandle) table, session);
+    }
+
+    @Override
+    public ConnectorSplitSource getSplits(
+            ConnectorTransactionHandle transaction,
+            ConnectorSession session,
+            ConnectorTableFunctionHandle function) {
+        if (function instanceof TrinoTableHandle) {
+            return getSplits((TrinoTableHandle) function, session);
+        }
+        throw new IllegalStateException("Unknown table function: " + function);
     }
 
     protected ConnectorSplitSource getSplits(
-            ConnectorTableHandle connectorTableHandle, ConnectorSession 
session) {
+            TrinoTableHandle tableHandle, ConnectorSession session) {
         // TODO dynamicFilter?
         // TODO what is constraint?
 
-        TrinoTableHandle tableHandle = (TrinoTableHandle) connectorTableHandle;
         Table table = tableHandle.tableWithDynamicOptions(trinoCatalog, 
session);
         ReadBuilder readBuilder = table.newReadBuilder();
         new TrinoFilterConverter(table.rowType())
@@ -89,6 +100,6 @@ public class TrinoSplitManager implements 
ConnectorSplitManager {
                                                                 
minimumSplitWeight),
                                                         1.0)))
                         .collect(Collectors.toList()),
-                ((TrinoTableHandle) connectorTableHandle).getLimit());
+                tableHandle.getLimit());
     }
 }
diff --git a/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java 
b/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
index d5dbf44..163c6a1 100644
--- a/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
+++ b/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
@@ -34,6 +34,7 @@ import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorTableHandle;
 import io.trino.spi.connector.ConnectorTableMetadata;
 import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.function.table.ConnectorTableFunctionHandle;
 import io.trino.spi.predicate.TupleDomain;
 
 import java.util.Collections;
@@ -47,7 +48,10 @@ import java.util.stream.Collectors;
 
 /** Trino {@link ConnectorTableHandle}. */
 public class TrinoTableHandle
-        implements ConnectorTableHandle, ConnectorInsertTableHandle, 
ConnectorOutputTableHandle {
+        implements ConnectorTableHandle,
+                ConnectorInsertTableHandle,
+                ConnectorOutputTableHandle,
+                ConnectorTableFunctionHandle {
 
     private final String schemaName;
     private final String tableName;
@@ -75,7 +79,7 @@ public class TrinoTableHandle
             @JsonProperty("tableName") String tableName,
             @JsonProperty("dynamicOptions") Map<String, String> dynamicOptions,
             @JsonProperty("filter") TupleDomain<TrinoColumnHandle> filter,
-            @JsonProperty("projection") Optional<List<ColumnHandle>> 
projectedColumns,
+            @JsonProperty("projectedColumns") Optional<List<ColumnHandle>> 
projectedColumns,
             @JsonProperty("limit") OptionalLong limit) {
         this.schemaName = schemaName;
         this.tableName = tableName;
diff --git a/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java 
b/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
index 3504fb5..7eca2f6 100644
--- a/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
+++ b/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
@@ -124,7 +124,7 @@ public class TrinoTableOptionUtils {
         }
     }
 
-    private static String convertOptionKey(String key) {
+    public static String convertOptionKey(String key) {
         String regex = "[.\\-]";
         Pattern pattern = Pattern.compile(regex);
         Matcher matcher = pattern.matcher(key);
diff --git a/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java 
b/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java
index debafbc..b6e04c8 100644
--- a/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java
+++ b/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java
@@ -23,10 +23,10 @@ import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Database;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.catalog.PropertyChange;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.security.SecurityContext;
@@ -109,6 +109,11 @@ public class TrinoCatalog implements Catalog {
         return current.options();
     }
 
+    @Override
+    public boolean caseSensitive() {
+        return current.caseSensitive();
+    }
+
     @Override
     public FileIO fileIO() {
         if (!inited) {
@@ -140,13 +145,14 @@ public class TrinoCatalog implements Catalog {
     }
 
     @Override
-    public Table getTable(Identifier identifier) throws TableNotExistException 
{
-        return current.getTable(identifier);
+    public void alterDatabase(String s, List<PropertyChange> list, boolean b)
+            throws DatabaseNotExistException {
+        current.alterDatabase(s, list, b);
     }
 
     @Override
-    public Path getTableLocation(Identifier identifier) {
-        return current.getTableLocation(identifier);
+    public Table getTable(Identifier identifier) throws TableNotExistException 
{
+        return current.getTable(identifier);
     }
 
     @Override
@@ -190,8 +196,7 @@ public class TrinoCatalog implements Catalog {
     }
 
     @Override
-    public List<PartitionEntry> listPartitions(Identifier identifier)
-            throws TableNotExistException {
+    public List<Partition> listPartitions(Identifier identifier) throws 
TableNotExistException {
         return current.listPartitions(identifier);
     }
 
@@ -213,9 +218,4 @@ public class TrinoCatalog implements Catalog {
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
         current.alterTable(identifier, change, ignoreIfNotExists);
     }
-
-    @Override
-    public boolean allowUpperCase() {
-        return current.allowUpperCase();
-    }
 }
diff --git 
a/src/main/java/org/apache/paimon/trino/functions/TrinoFunctionProvider.java 
b/src/main/java/org/apache/paimon/trino/functions/TrinoFunctionProvider.java
new file mode 100644
index 0000000..4ced19f
--- /dev/null
+++ b/src/main/java/org/apache/paimon/trino/functions/TrinoFunctionProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino.functions;
+
+import org.apache.paimon.trino.TrinoTableHandle;
+import 
org.apache.paimon.trino.functions.tablechanges.TableChangesFunctionProcessorProvider;
+
+import com.google.inject.Inject;
+import 
io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionProcessorProvider;
+import io.trino.spi.function.FunctionProvider;
+import io.trino.spi.function.table.ConnectorTableFunctionHandle;
+import io.trino.spi.function.table.TableFunctionProcessorProvider;
+
+/** TrinoFunctionProvider. */
+public class TrinoFunctionProvider implements FunctionProvider {
+
+    private final TableChangesFunctionProcessorProvider 
tableChangesFunctionProcessorProvider;
+
+    @Inject
+    public TrinoFunctionProvider(
+            TableChangesFunctionProcessorProvider 
tableChangesFunctionProcessorProvider) {
+        this.tableChangesFunctionProcessorProvider = 
tableChangesFunctionProcessorProvider;
+    }
+
+    @Override
+    public TableFunctionProcessorProvider getTableFunctionProcessorProvider(
+            ConnectorTableFunctionHandle functionHandle) {
+        if (functionHandle instanceof TrinoTableHandle) {
+            return new ClassLoaderSafeTableFunctionProcessorProvider(
+                    tableChangesFunctionProcessorProvider, 
getClass().getClassLoader());
+        }
+        return 
FunctionProvider.super.getTableFunctionProcessorProvider(functionHandle);
+    }
+}
diff --git 
a/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunction.java
 
b/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunction.java
new file mode 100644
index 0000000..7c1ef88
--- /dev/null
+++ 
b/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunction.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino.functions.tablechanges;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.trino.TrinoColumnHandle;
+import org.apache.paimon.trino.TrinoMetadata;
+import org.apache.paimon.trino.TrinoMetadataFactory;
+import org.apache.paimon.trino.TrinoTableHandle;
+import org.apache.paimon.trino.TrinoTableOptionUtils;
+import org.apache.paimon.trino.TrinoTypeUtils;
+import org.apache.paimon.trino.catalog.TrinoCatalog;
+
+import com.google.inject.Inject;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ConnectorAccessControl;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.function.table.AbstractConnectorTableFunction;
+import io.trino.spi.function.table.Argument;
+import io.trino.spi.function.table.Descriptor;
+import io.trino.spi.function.table.ScalarArgument;
+import io.trino.spi.function.table.ScalarArgumentSpecification;
+import io.trino.spi.function.table.TableFunctionAnalysis;
+import io.trino.spi.predicate.TupleDomain;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
+import static 
io.trino.spi.function.table.ReturnTypeSpecification.GenericTable.GENERIC_TABLE;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static java.util.Locale.ENGLISH;
+import static java.util.Objects.requireNonNull;
+
+/** TableChangesFunction. */
+public class TableChangesFunction extends AbstractConnectorTableFunction {
+
+    private static final Slice INVALID_VALUE = Slices.utf8Slice("invalid");
+    private static final String FUNCTION_NAME = "table_changes";
+    private static final String SCHEMA_NAME_VAR_NAME = "SCHEMA_NAME";
+    private static final String TABLE_NAME_VAR_NAME = "TABLE_NAME";
+    private static final String INCREMENTAL_BETWEEN_SCAN_MODE =
+            
TrinoTableOptionUtils.convertOptionKey(CoreOptions.INCREMENTAL_BETWEEN_SCAN_MODE.key())
+                    .toUpperCase(ENGLISH);
+    private static final String INCREMENTAL_BETWEEN_TIMESTAMP =
+            
TrinoTableOptionUtils.convertOptionKey(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key())
+                    .toUpperCase(ENGLISH);
+    private static final String INCREMENTAL_BETWEEN =
+            
TrinoTableOptionUtils.convertOptionKey(CoreOptions.INCREMENTAL_BETWEEN.key())
+                    .toUpperCase(ENGLISH);
+    private final TrinoMetadata trinoMetadata;
+
+    @Inject
+    public TableChangesFunction(TrinoMetadataFactory trinoMetadataFactory) {
+        super(
+                "system",
+                FUNCTION_NAME,
+                ImmutableList.of(
+                        ScalarArgumentSpecification.builder()
+                                .name(SCHEMA_NAME_VAR_NAME)
+                                .type(VARCHAR)
+                                .build(),
+                        ScalarArgumentSpecification.builder()
+                                .name(TABLE_NAME_VAR_NAME)
+                                .type(VARCHAR)
+                                .build(),
+                        ScalarArgumentSpecification.builder()
+                                .name(INCREMENTAL_BETWEEN_SCAN_MODE)
+                                .defaultValue(
+                                        Slices.utf8Slice(
+                                                
CoreOptions.INCREMENTAL_BETWEEN_SCAN_MODE
+                                                        .defaultValue()
+                                                        .getValue()))
+                                .type(VARCHAR)
+                                .build(),
+                        ScalarArgumentSpecification.builder()
+                                .name(INCREMENTAL_BETWEEN)
+                                .defaultValue(INVALID_VALUE)
+                                .type(VARCHAR)
+                                .build(),
+                        ScalarArgumentSpecification.builder()
+                                .name(INCREMENTAL_BETWEEN_TIMESTAMP)
+                                .defaultValue(INVALID_VALUE)
+                                .type(VARCHAR)
+                                .build()),
+                GENERIC_TABLE);
+        this.trinoMetadata =
+                requireNonNull(trinoMetadataFactory, "trinoMetadataFactory is 
null").create();
+    }
+
+    @Override
+    public TableFunctionAnalysis analyze(
+            ConnectorSession session,
+            ConnectorTransactionHandle transaction,
+            Map<String, Argument> arguments,
+            ConnectorAccessControl accessControl) {
+        String schema = getSchemaName(arguments);
+        String table = getTableName(arguments);
+
+        Slice incrementalBetweenValue =
+                (Slice) ((ScalarArgument) 
arguments.get(INCREMENTAL_BETWEEN)).getValue();
+        Slice incrementalBetweenTimestamp =
+                (Slice) ((ScalarArgument) 
arguments.get(INCREMENTAL_BETWEEN_TIMESTAMP)).getValue();
+        if (incrementalBetweenValue.equals(INVALID_VALUE)
+                && incrementalBetweenTimestamp.equals(INVALID_VALUE)) {
+            throw new TrinoException(
+                    INVALID_FUNCTION_ARGUMENT,
+                    "Either "
+                            + INCREMENTAL_BETWEEN
+                            + " or "
+                            + INCREMENTAL_BETWEEN_TIMESTAMP
+                            + " must be provided");
+        }
+
+        SchemaTableName schemaTableName = new SchemaTableName(schema, table);
+        try {
+            TrinoCatalog catalog = trinoMetadata.catalog();
+            catalog.initSession(session);
+            Table paimonTable = catalog.getTable(Identifier.create(schema, 
table));
+            Map<String, String> options = new HashMap<>(paimonTable.options());
+            if (!incrementalBetweenValue.equals(INVALID_VALUE)) {
+                options.put(
+                        CoreOptions.INCREMENTAL_BETWEEN.key(),
+                        incrementalBetweenValue.toStringUtf8());
+            }
+            if (!incrementalBetweenTimestamp.equals(INVALID_VALUE)) {
+                options.put(
+                        CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key(),
+                        incrementalBetweenTimestamp.toStringUtf8());
+            }
+
+            ImmutableList.Builder<Descriptor.Field> columns = 
ImmutableList.builder();
+            List<ColumnHandle> projectedColumns = new ArrayList<>();
+            paimonTable.rowType().getFields().stream()
+                    .forEach(
+                            column -> {
+                                columns.add(
+                                        new Descriptor.Field(
+                                                column.name(),
+                                                Optional.of(
+                                                        
TrinoTypeUtils.fromPaimonType(
+                                                                
column.type()))));
+                                projectedColumns.add(
+                                        TrinoColumnHandle.of(column.name(), 
column.type()));
+                            });
+            return TableFunctionAnalysis.builder()
+                    .returnedType(new Descriptor(columns.build()))
+                    .handle(
+                            new TrinoTableHandle(
+                                    schema,
+                                    table,
+                                    options,
+                                    TupleDomain.all(),
+                                    Optional.of(projectedColumns),
+                                    OptionalLong.empty()))
+                    .build();
+        } catch (Catalog.TableNotExistException e) {
+            throw new TrinoException(
+                    INVALID_FUNCTION_ARGUMENT, "Table not found: " + 
schemaTableName);
+        }
+    }
+
+    private static String getSchemaName(Map<String, Argument> arguments) {
+        if (argumentExists(arguments, SCHEMA_NAME_VAR_NAME)) {
+            return ((Slice)
+                            checkNonNull(
+                                    ((ScalarArgument) 
arguments.get(SCHEMA_NAME_VAR_NAME))
+                                            .getValue()))
+                    .toStringUtf8();
+        }
+        throw new TrinoException(
+                INVALID_FUNCTION_ARGUMENT, SCHEMA_NAME_VAR_NAME + " argument 
not found");
+    }
+
+    private static String getTableName(Map<String, Argument> arguments) {
+        if (argumentExists(arguments, TABLE_NAME_VAR_NAME)) {
+            return ((Slice)
+                            checkNonNull(
+                                    ((ScalarArgument) 
arguments.get(TABLE_NAME_VAR_NAME))
+                                            .getValue()))
+                    .toStringUtf8();
+        }
+        throw new TrinoException(
+                INVALID_FUNCTION_ARGUMENT, TABLE_NAME_VAR_NAME + " argument 
not found");
+    }
+
+    private static boolean argumentExists(Map<String, Argument> arguments, 
String key) {
+        Argument argument = arguments.get(key);
+        if (argument instanceof ScalarArgument) {
+            return !((ScalarArgument) argument).getNullableValue().isNull();
+        }
+        throw new IllegalArgumentException("Unsupported argument type: " + 
argument);
+    }
+
+    private static Object checkNonNull(Object argumentValue) {
+        if (argumentValue == null) {
+            throw new TrinoException(
+                    INVALID_FUNCTION_ARGUMENT, FUNCTION_NAME + " arguments may 
not be null");
+        }
+        return argumentValue;
+    }
+}
diff --git 
a/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProcessor.java
 
b/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProcessor.java
new file mode 100644
index 0000000..4acdb57
--- /dev/null
+++ 
b/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProcessor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino.functions.tablechanges;
+
+import org.apache.paimon.trino.TrinoPageSourceProvider;
+import org.apache.paimon.trino.TrinoSplit;
+import org.apache.paimon.trino.TrinoTableHandle;
+
+import io.trino.spi.Page;
+import io.trino.spi.connector.ConnectorPageSource;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.DynamicFilter;
+import io.trino.spi.function.table.TableFunctionProcessorState;
+import io.trino.spi.function.table.TableFunctionSplitProcessor;
+
+import static 
io.trino.spi.function.table.TableFunctionProcessorState.Finished.FINISHED;
+
+/** TableChangesFunctionProcessor. */
+public class TableChangesFunctionProcessor implements 
TableFunctionSplitProcessor {
+
+    private static final Page EMPTY_PAGE = new Page(0);
+
+    private final ConnectorPageSource pageSource;
+
+    public TableChangesFunctionProcessor(
+            ConnectorSession session,
+            TrinoTableHandle handle,
+            TrinoSplit split,
+            TrinoPageSourceProvider pageSourceProvider) {
+        this.pageSource =
+                pageSourceProvider.createPageSource(
+                        null,
+                        session,
+                        split,
+                        (ConnectorTableHandle) handle,
+                        handle.getProjectedColumns().get(),
+                        DynamicFilter.EMPTY);
+    }
+
+    @Override
+    public TableFunctionProcessorState process() {
+        if (pageSource.isFinished()) {
+            return FINISHED;
+        }
+        Page dataPage = pageSource.getNextPage();
+        if (dataPage == null) {
+            return TableFunctionProcessorState.Processed.produced(EMPTY_PAGE);
+        } else {
+            return TableFunctionProcessorState.Processed.produced(dataPage);
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProcessorProvider.java
 
b/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProcessorProvider.java
new file mode 100644
index 0000000..be807d0
--- /dev/null
+++ 
b/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProcessorProvider.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino.functions.tablechanges;
+
+import org.apache.paimon.trino.TrinoPageSourceProvider;
+import org.apache.paimon.trino.TrinoSplit;
+import org.apache.paimon.trino.TrinoTableHandle;
+
+import com.google.inject.Inject;
+import 
io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionSplitProcessor;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplit;
+import io.trino.spi.function.table.ConnectorTableFunctionHandle;
+import io.trino.spi.function.table.TableFunctionProcessorProvider;
+import io.trino.spi.function.table.TableFunctionSplitProcessor;
+
+/** TableChangesFunctionProcessorProvider. */
+public class TableChangesFunctionProcessorProvider implements 
TableFunctionProcessorProvider {
+
+    private final TrinoPageSourceProvider icebergPageSourceProvider;
+
+    @Inject
+    public TableChangesFunctionProcessorProvider(
+            TrinoPageSourceProvider icebergPageSourceProvider) {
+        this.icebergPageSourceProvider = icebergPageSourceProvider;
+    }
+
+    @Override
+    public TableFunctionSplitProcessor getSplitProcessor(
+            ConnectorSession session, ConnectorTableFunctionHandle handle, 
ConnectorSplit split) {
+        return new ClassLoaderSafeTableFunctionSplitProcessor(
+                new TableChangesFunctionProcessor(
+                        session,
+                        (TrinoTableHandle) handle,
+                        (TrinoSplit) split,
+                        icebergPageSourceProvider),
+                getClass().getClassLoader());
+    }
+}
diff --git 
a/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProvider.java
 
b/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProvider.java
new file mode 100644
index 0000000..95736d7
--- /dev/null
+++ 
b/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino.functions.tablechanges;
+
+import org.apache.paimon.trino.TrinoMetadataFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorTableFunction;
+import io.trino.spi.function.table.ConnectorTableFunction;
+
+import static java.util.Objects.requireNonNull;
+
+/** TableChangesFunctionProvider. */
+public class TableChangesFunctionProvider implements 
Provider<ConnectorTableFunction> {
+    private final TrinoMetadataFactory trinoMetadataFactory;
+
+    @Inject
+    public TableChangesFunctionProvider(TrinoMetadataFactory 
trinoMetadataFactory) {
+        this.trinoMetadataFactory =
+                requireNonNull(trinoMetadataFactory, "trinoMetadataFactory is 
null");
+    }
+
+    @Override
+    public ConnectorTableFunction get() {
+        return new ClassLoaderSafeConnectorTableFunction(
+                new TableChangesFunction(trinoMetadataFactory), 
getClass().getClassLoader());
+    }
+}
diff --git a/src/test/java/org/apache/paimon/trino/TrinoITCase.java 
b/src/test/java/org/apache/paimon/trino/TrinoITCase.java
index 1d460c4..c6f17d2 100644
--- a/src/test/java/org/apache/paimon/trino/TrinoITCase.java
+++ b/src/test/java/org/apache/paimon/trino/TrinoITCase.java
@@ -49,6 +49,7 @@ import org.apache.paimon.types.VarCharType;
 import io.trino.testing.AbstractTestQueryFramework;
 import io.trino.testing.DistributedQueryRunner;
 import io.trino.testing.MaterializedResult;
+import io.trino.testing.QueryFailedException;
 import io.trino.testing.QueryRunner;
 import org.junit.jupiter.api.Test;
 
@@ -68,6 +69,7 @@ import static 
io.trino.testing.TestingSession.testSessionBuilder;
 import static java.time.ZoneOffset.UTC;
 import static org.apache.paimon.data.BinaryString.fromString;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 
 /** ITCase for trino connector. */
 public class TrinoITCase extends AbstractTestQueryFramework {
@@ -854,6 +856,32 @@ public class TrinoITCase extends 
AbstractTestQueryFramework {
                 .isEqualTo("[[1, 2, 1, 1], [3, 4, 2, 2], [5, 6, 3, 3], [7, 8, 
4, 4]]");
     }
 
+    @Test
+    public void testIncrementalRead() {
+        assertThatExceptionOfType(QueryFailedException.class)
+                .isThrownBy(
+                        () ->
+                                sql(
+                                        "SELECT * FROM 
TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2'))"))
+                .withMessage(
+                        "Either INCREMENTAL_BETWEEN or 
INCREMENTAL_BETWEEN_TIMESTAMP must be provided");
+        assertThat(
+                        sql(
+                                "SELECT * FROM 
TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2',incremental_between=>'1,2'))"))
+                .isEqualTo("[[5, 6, 3, 3], [7, 8, 4, 4]]");
+        assertThat(
+                        sql(
+                                "SELECT * FROM 
TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2',incremental_between=>'1,tag-2'))"))
+                .isEqualTo("[[5, 6, 3, 3], [7, 8, 4, 4]]");
+        assertThat(
+                        sql(
+                                "SELECT * FROM 
TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2',incremental_between_timestamp=>'%s,%s'))"
+                                        .formatted(
+                                                t2FirstCommitTimestamp,
+                                                System.currentTimeMillis())))
+                .isEqualTo("[[5, 6, 3, 3], [7, 8, 4, 4]]");
+    }
+
     @Test
     public void testTimeTravelWithTag() {
         // tag or snapshotId is string


Reply via email to