This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch old_version_master
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git


The following commit(s) were added to refs/heads/old_version_master by this 
push:
     new f67712a  [388] Cherry-pick incremental read from 
https://github.com/apache/paimon-trino/commit/5697da8de19a60e52842fd86d612c99e35aa0877\#diff-e62f488a14d012b280ffd60b07e6a3915757e393f68103b42fd3262adc0b9d61,
 make it suitable for trino-388
f67712a is described below

commit f67712ac0c5651e11952d51fea5d28f3229f419d
Author: 仟弋 <yejunhao....@alibaba-inc.com>
AuthorDate: Tue Jul 22 11:25:31 2025 +0800

    [388] Cherry-pick incremental read from 
https://github.com/apache/paimon-trino/commit/5697da8de19a60e52842fd86d612c99e35aa0877\#diff-e62f488a14d012b280ffd60b07e6a3915757e393f68103b42fd3262adc0b9d61,
 make it suitable for trino-388
---
 .../apache/paimon/trino/TableChangesFunction.java  | 223 ++++++++++
 .../paimon/trino/TableChangesFunctionProvider.java |  43 ++
 .../org/apache/paimon/trino/TrinoConnector.java    |  57 +++
 .../apache/paimon/trino/TrinoConnectorBase.java    |  96 +++++
 .../paimon/trino/TrinoConnectorFactoryBase.java    |  97 +++++
 .../org/apache/paimon/trino/TrinoMetadataBase.java | 447 +++++++++++++++++++++
 .../java/org/apache/paimon/trino/TrinoModule.java  |  54 +++
 .../apache/paimon/trino/TrinoSplitManagerBase.java |  66 +++
 .../org/apache/paimon/trino/TrinoTableHandle.java  | 210 ++++++++++
 .../apache/paimon/trino/TestTrino388ITCase.java    |  26 ++
 .../apache/paimon/trino/TrinoTableOptionUtils.java |   2 +-
 11 files changed, 1320 insertions(+), 1 deletion(-)

diff --git 
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunction.java
 
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunction.java
new file mode 100644
index 0000000..94c8da9
--- /dev/null
+++ 
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunction.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import com.google.inject.Inject;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.ptf.AbstractConnectorTableFunction;
+import io.trino.spi.ptf.Argument;
+import io.trino.spi.ptf.Descriptor;
+import io.trino.spi.ptf.ScalarArgument;
+import io.trino.spi.ptf.ScalarArgumentSpecification;
+import io.trino.spi.ptf.TableFunctionAnalysis;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
+import static 
io.trino.spi.ptf.ReturnTypeSpecification.GenericTable.GENERIC_TABLE;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static java.util.Locale.ENGLISH;
+import static java.util.Objects.requireNonNull;
+
+/** TableChangesFunction. */
+public class TableChangesFunction extends AbstractConnectorTableFunction {
+
+    private static final Slice INVALID_VALUE = Slices.utf8Slice("invalid");
+    private static final String FUNCTION_NAME = "table_changes";
+    private static final String SCHEMA_NAME_VAR_NAME = "SCHEMA_NAME";
+    private static final String TABLE_NAME_VAR_NAME = "TABLE_NAME";
+    private static final String INCREMENTAL_BETWEEN_SCAN_MODE =
+            
TrinoTableOptionUtils.convertOptionKey(CoreOptions.INCREMENTAL_BETWEEN_SCAN_MODE.key())
+                    .toUpperCase(ENGLISH);
+    private static final String INCREMENTAL_BETWEEN_TIMESTAMP =
+            
TrinoTableOptionUtils.convertOptionKey(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key())
+                    .toUpperCase(ENGLISH);
+    private static final String INCREMENTAL_BETWEEN =
+            
TrinoTableOptionUtils.convertOptionKey(CoreOptions.INCREMENTAL_BETWEEN.key())
+                    .toUpperCase(ENGLISH);
+    private final TrinoMetadata trinoMetadata;
+
+    @Inject
+    public TableChangesFunction(TrinoMetadataFactory trinoMetadataFactory) {
+        super(
+                "system",
+                FUNCTION_NAME,
+                ImmutableList.of(
+                        ScalarArgumentSpecification.builder()
+                                .name(SCHEMA_NAME_VAR_NAME)
+                                .type(VARCHAR)
+                                .build(),
+                        ScalarArgumentSpecification.builder()
+                                .name(TABLE_NAME_VAR_NAME)
+                                .type(VARCHAR)
+                                .build(),
+                        ScalarArgumentSpecification.builder()
+                                .name(INCREMENTAL_BETWEEN_SCAN_MODE)
+                                .defaultValue(
+                                        Slices.utf8Slice(
+                                                
CoreOptions.INCREMENTAL_BETWEEN_SCAN_MODE
+                                                        .defaultValue()
+                                                        .toString()))
+                                .type(VARCHAR)
+                                .build(),
+                        ScalarArgumentSpecification.builder()
+                                .name(INCREMENTAL_BETWEEN)
+                                .defaultValue(INVALID_VALUE)
+                                .type(VARCHAR)
+                                .build(),
+                        ScalarArgumentSpecification.builder()
+                                .name(INCREMENTAL_BETWEEN_TIMESTAMP)
+                                .defaultValue(INVALID_VALUE)
+                                .type(VARCHAR)
+                                .build()),
+                GENERIC_TABLE);
+        this.trinoMetadata =
+                requireNonNull(trinoMetadataFactory, "trinoMetadataFactory is 
null").create();
+    }
+
+    @Override
+    public TableFunctionAnalysis analyze(
+            ConnectorSession session,
+            ConnectorTransactionHandle transaction,
+            Map<String, Argument> arguments) {
+        String schema = getSchemaName(arguments);
+        String table = getTableName(arguments);
+
+        Slice incrementalBetweenValue =
+                (Slice) ((ScalarArgument) 
arguments.get(INCREMENTAL_BETWEEN)).getValue();
+        Slice incrementalBetweenTimestamp =
+                (Slice) ((ScalarArgument) 
arguments.get(INCREMENTAL_BETWEEN_TIMESTAMP)).getValue();
+        if (incrementalBetweenValue.equals(INVALID_VALUE)
+                && incrementalBetweenTimestamp.equals(INVALID_VALUE)) {
+            throw new TrinoException(
+                    INVALID_FUNCTION_ARGUMENT,
+                    "Either "
+                            + INCREMENTAL_BETWEEN
+                            + " or "
+                            + INCREMENTAL_BETWEEN_TIMESTAMP
+                            + " must be provided");
+        }
+
+        SchemaTableName schemaTableName = new SchemaTableName(schema, table);
+        try {
+            TrinoTableHandle trinoTableHandle =
+                    trinoMetadata.getTableHandle(session, schemaTableName);
+            Table paimonTable = trinoTableHandle.table();
+            Map<String, String> options = new HashMap<>(paimonTable.options());
+            if (!incrementalBetweenValue.equals(INVALID_VALUE)) {
+                options.put(
+                        CoreOptions.INCREMENTAL_BETWEEN.key(),
+                        incrementalBetweenValue.toStringUtf8());
+            }
+            if (!incrementalBetweenTimestamp.equals(INVALID_VALUE)) {
+                options.put(
+                        CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key(),
+                        incrementalBetweenTimestamp.toStringUtf8());
+            }
+            paimonTable = paimonTable.copy(options);
+
+            ImmutableList.Builder<Descriptor.Field> columns = 
ImmutableList.builder();
+            List<ColumnHandle> projectedColumns = new ArrayList<>();
+            paimonTable.rowType().getFields().stream()
+                    .forEach(
+                            column -> {
+                                columns.add(
+                                        new Descriptor.Field(
+                                                column.name(),
+                                                Optional.of(
+                                                        
TrinoTypeUtils.fromPaimonType(
+                                                                
column.type()))));
+                                projectedColumns.add(
+                                        TrinoColumnHandle.of(
+                                                column.name().toLowerCase(), 
column.type()));
+                            });
+            return TableFunctionAnalysis.builder()
+                    .returnedType(new Descriptor(columns.build()))
+                    .handle(
+                            new TrinoTableHandle(
+                                    schema,
+                                    table,
+                                    
InstantiationUtil.serializeObject(paimonTable),
+                                    TupleDomain.all(),
+                                    Optional.of(projectedColumns),
+                                    OptionalLong.empty()))
+                    .build();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to serialize table", e);
+        }
+    }
+
+    private static String getSchemaName(Map<String, Argument> arguments) {
+        if (argumentExists(arguments, SCHEMA_NAME_VAR_NAME)) {
+            return ((Slice)
+                            checkNonNull(
+                                    ((ScalarArgument) 
arguments.get(SCHEMA_NAME_VAR_NAME))
+                                            .getValue()))
+                    .toStringUtf8();
+        }
+        throw new TrinoException(
+                INVALID_FUNCTION_ARGUMENT, SCHEMA_NAME_VAR_NAME + " argument 
not found");
+    }
+
+    private static String getTableName(Map<String, Argument> arguments) {
+        if (argumentExists(arguments, TABLE_NAME_VAR_NAME)) {
+            return ((Slice)
+                            checkNonNull(
+                                    ((ScalarArgument) 
arguments.get(TABLE_NAME_VAR_NAME))
+                                            .getValue()))
+                    .toStringUtf8();
+        }
+        throw new TrinoException(
+                INVALID_FUNCTION_ARGUMENT, TABLE_NAME_VAR_NAME + " argument 
not found");
+    }
+
+    private static boolean argumentExists(Map<String, Argument> arguments, 
String key) {
+        Argument argument = arguments.get(key);
+        if (argument instanceof ScalarArgument) {
+            return !(((ScalarArgument) argument).getValue() == null);
+        }
+        throw new IllegalArgumentException("Unsupported argument type: " + 
argument);
+    }
+
+    private static Object checkNonNull(Object argumentValue) {
+        if (argumentValue == null) {
+            throw new TrinoException(
+                    INVALID_FUNCTION_ARGUMENT, FUNCTION_NAME + " arguments may 
not be null");
+        }
+        return argumentValue;
+    }
+}
diff --git 
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunctionProvider.java
 
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunctionProvider.java
new file mode 100644
index 0000000..0f9f8a9
--- /dev/null
+++ 
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunctionProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorTableFunction;
+import io.trino.spi.ptf.ConnectorTableFunction;
+
+import static java.util.Objects.requireNonNull;
+
+/** TableChangesFunctionProvider. */
+public class TableChangesFunctionProvider implements 
Provider<ConnectorTableFunction> {
+    private final TrinoMetadataFactory trinoMetadataFactory;
+
+    @Inject
+    public TableChangesFunctionProvider(TrinoMetadataFactory 
trinoMetadataFactory) {
+        this.trinoMetadataFactory =
+                requireNonNull(trinoMetadataFactory, "trinoMetadataFactory is 
null");
+    }
+
+    @Override
+    public ConnectorTableFunction get() {
+        return new ClassLoaderSafeConnectorTableFunction(
+                new TableChangesFunction(trinoMetadataFactory), 
getClass().getClassLoader());
+    }
+}
diff --git 
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnector.java 
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnector.java
new file mode 100644
index 0000000..043616f
--- /dev/null
+++ b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnector.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.ptf.ConnectorTableFunction;
+import io.trino.spi.transaction.IsolationLevel;
+
+import java.util.Set;
+
+/** Trino {@link Connector}. */
+public class TrinoConnector extends TrinoConnectorBase {
+
+    public TrinoConnector(
+            TrinoMetadataBase trinoMetadata,
+            TrinoSplitManagerBase trinoSplitManager,
+            TrinoPageSourceProvider trinoPageSourceProvider,
+            TrinoTableOptions trinoTableOptions,
+            TrinoSessionProperties trinoSessionProperties,
+            Set<ConnectorTableFunction> connectorTableFunctions) {
+        super(
+                trinoMetadata,
+                trinoSplitManager,
+                trinoPageSourceProvider,
+                trinoTableOptions,
+                trinoSessionProperties,
+                connectorTableFunctions);
+    }
+
+    @Override
+    public ConnectorTransactionHandle beginTransaction(
+            IsolationLevel isolationLevel, boolean readOnly) {
+        return beginTransactionBase(isolationLevel, readOnly);
+    }
+
+    @Override
+    public TrinoMetadataBase getMetadata(ConnectorTransactionHandle 
transactionHandle) {
+        return getMetadataBase(transactionHandle);
+    }
+}
diff --git 
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorBase.java
 
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorBase.java
new file mode 100644
index 0000000..24d6aeb
--- /dev/null
+++ 
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorBase.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;
+
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.ptf.ConnectorTableFunction;
+import io.trino.spi.session.PropertyMetadata;
+import io.trino.spi.transaction.IsolationLevel;
+
+import java.util.List;
+import java.util.Set;
+
+import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+/** Trino {@link Connector}. */
+public abstract class TrinoConnectorBase implements Connector {
+    private final TrinoMetadataBase trinoMetadata;
+    private final TrinoSplitManagerBase trinoSplitManager;
+    private final TrinoPageSourceProvider trinoPageSourceProvider;
+    private final List<PropertyMetadata<?>> tableProperties;
+    private final List<PropertyMetadata<?>> sessionProperties;
+    private final Set<ConnectorTableFunction> tableFunctions;
+
+    public TrinoConnectorBase(
+            TrinoMetadataBase trinoMetadata,
+            TrinoSplitManagerBase trinoSplitManager,
+            TrinoPageSourceProvider trinoPageSourceProvider,
+            TrinoTableOptions trinoTableOptions,
+            TrinoSessionProperties trinoSessionProperties,
+            Set<ConnectorTableFunction> tableFunctions) {
+        this.trinoMetadata = requireNonNull(trinoMetadata, "jmxMetadata is 
null");
+        this.trinoSplitManager = requireNonNull(trinoSplitManager, 
"jmxSplitManager is null");
+        this.trinoPageSourceProvider =
+                requireNonNull(trinoPageSourceProvider, "jmxRecordSetProvider 
is null");
+        this.tableProperties = trinoTableOptions.getTableProperties();
+        sessionProperties = trinoSessionProperties.getSessionProperties();
+        this.tableFunctions =
+                ImmutableSet.copyOf(requireNonNull(tableFunctions, 
"tableFunctions is null"));
+    }
+
+    protected ConnectorTransactionHandle beginTransactionBase(
+            IsolationLevel isolationLevel, boolean readOnly) {
+        checkConnectorSupports(READ_COMMITTED, isolationLevel);
+        return TrinoTransactionHandle.INSTANCE;
+    }
+
+    protected TrinoMetadataBase getMetadataBase(ConnectorTransactionHandle 
transactionHandle) {
+        return trinoMetadata;
+    }
+
+    @Override
+    public TrinoSplitManagerBase getSplitManager() {
+        return trinoSplitManager;
+    }
+
+    @Override
+    public TrinoPageSourceProvider getPageSourceProvider() {
+        return trinoPageSourceProvider;
+    }
+
+    @Override
+    public List<PropertyMetadata<?>> getSessionProperties() {
+        return sessionProperties;
+    }
+
+    @Override
+    public List<PropertyMetadata<?>> getTableProperties() {
+        return tableProperties;
+    }
+
+    @Override
+    public Set<ConnectorTableFunction> getTableFunctions() {
+        return tableFunctions;
+    }
+}
diff --git 
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorFactoryBase.java
 
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorFactoryBase.java
new file mode 100644
index 0000000..8a54f1d
--- /dev/null
+++ 
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorFactoryBase.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.json.JsonModule;
+import io.trino.plugin.hive.HiveHdfsModule;
+import io.trino.plugin.hive.NodeVersion;
+import io.trino.plugin.hive.authentication.HdfsAuthenticationModule;
+import io.trino.spi.classloader.ThreadContextClassLoader;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.connector.ConnectorFactory;
+import io.trino.spi.ptf.ConnectorTableFunction;
+import io.trino.spi.type.TypeManager;
+
+import java.util.Map;
+import java.util.Set;
+
+/** Trino {@link ConnectorFactory}. */
+public abstract class TrinoConnectorFactoryBase implements ConnectorFactory {
+
+    @Override
+    public String getName() {
+        return "paimon";
+    }
+
+    @Override
+    public Connector create(
+            String catalogName, Map<String, String> config, ConnectorContext 
context) {
+
+        try (ThreadContextClassLoader ignored =
+                new 
ThreadContextClassLoader(TrinoConnectorFactoryBase.class.getClassLoader())) {
+            Bootstrap app = new Bootstrap(modules(catalogName, config, 
context));
+
+            Injector injector =
+                    app.doNotInitializeLogging()
+                            .setRequiredConfigurationProperties(Map.of())
+                            .setOptionalConfigurationProperties(config)
+                            .initialize();
+
+            TrinoMetadata trinoMetadata = 
injector.getInstance(TrinoMetadataFactory.class).create();
+            TrinoSplitManager trinoSplitManager = 
injector.getInstance(TrinoSplitManager.class);
+            TrinoPageSourceProvider trinoPageSourceProvider =
+                    injector.getInstance(TrinoPageSourceProvider.class);
+            TrinoSessionProperties trinoSessionProperties =
+                    injector.getInstance(TrinoSessionProperties.class);
+            TrinoTableOptions trinoTableOptions = 
injector.getInstance(TrinoTableOptions.class);
+            Set<ConnectorTableFunction> connectorTableFunctions =
+                    injector.getInstance(new Key<>() {});
+
+            return new TrinoConnector(
+                    trinoMetadata,
+                    trinoSplitManager,
+                    trinoPageSourceProvider,
+                    trinoTableOptions,
+                    trinoSessionProperties,
+                    connectorTableFunctions);
+        }
+    }
+
+    protected Module[] modules(
+            String catalogName, Map<String, String> config, ConnectorContext 
context) {
+        return new Module[] {
+            new JsonModule(),
+            new TrinoModule(config),
+            new HiveHdfsModule(),
+            new HdfsAuthenticationModule(),
+            binder -> {
+                binder.bind(NodeVersion.class)
+                        .toInstance(
+                                new NodeVersion(
+                                        
context.getNodeManager().getCurrentNode().getVersion()));
+                
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
+            }
+        };
+    }
+}
diff --git 
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java 
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java
new file mode 100644
index 0000000..dfcc435
--- /dev/null
+++ 
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.InstantiationUtil;
+import org.apache.paimon.utils.StringUtils;
+
+import io.trino.spi.connector.Assignment;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.ConnectorTableProperties;
+import io.trino.spi.connector.Constraint;
+import io.trino.spi.connector.ConstraintApplicationResult;
+import io.trino.spi.connector.LimitApplicationResult;
+import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.connector.SchemaTablePrefix;
+import io.trino.spi.connector.TableFunctionApplicationResult;
+import io.trino.spi.expression.ConnectorExpression;
+import io.trino.spi.predicate.Domain;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.ptf.ConnectorTableFunctionHandle;
+import io.trino.spi.security.TrinoPrincipal;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Trino {@link ConnectorMetadata}. */
+public abstract class TrinoMetadataBase implements ConnectorMetadata {
+
+    protected final Catalog catalog;
+
+    public TrinoMetadataBase(Catalog catalog) {
+        this.catalog = catalog;
+    }
+
+    @Override
+    public List<String> listSchemaNames(ConnectorSession session) {
+        return listSchemaNames();
+    }
+
+    private List<String> listSchemaNames() {
+        return catalog.listDatabases();
+    }
+
+    @Override
+    public void createSchema(
+            ConnectorSession session,
+            String schemaName,
+            Map<String, Object> properties,
+            TrinoPrincipal owner) {
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(schemaName),
+                "schemaName cannot be null or empty");
+
+        try {
+            catalog.createDatabase(schemaName, true);
+        } catch (Catalog.DatabaseAlreadyExistException e) {
+            throw new RuntimeException(format("database already existed: 
'%s'", schemaName));
+        }
+    }
+
+    @Override
+    public void dropSchema(ConnectorSession session, String schemaName) {
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(schemaName),
+                "schemaName cannot be null or empty");
+        try {
+            catalog.dropDatabase(schemaName, false, true);
+        } catch (Catalog.DatabaseNotEmptyException e) {
+            throw new RuntimeException(format("database is not empty: '%s'", 
schemaName));
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new RuntimeException(format("database not exists: '%s'", 
schemaName));
+        }
+    }
+
+    @Override
+    public TrinoTableHandle getTableHandle(ConnectorSession session, 
SchemaTableName tableName) {
+        return getTableHandle(tableName, null);
+    }
+
+    @Override
+    public ConnectorTableProperties getTableProperties(
+            ConnectorSession session, ConnectorTableHandle table) {
+        return new ConnectorTableProperties();
+    }
+
+    public TrinoTableHandle getTableHandle(
+            SchemaTableName tableName, Map<String, String> dynamicOptions) {
+        Identifier tablePath = new Identifier(tableName.getSchemaName(), 
tableName.getTableName());
+        byte[] serializedTable;
+        try {
+            Table table = catalog.getTable(tablePath);
+            if (dynamicOptions != null && !dynamicOptions.isEmpty()) {
+                table = table.copy(dynamicOptions);
+            }
+            serializedTable = InstantiationUtil.serializeObject(table);
+        } catch (Catalog.TableNotExistException e) {
+            return null;
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+
+        return new TrinoTableHandle(
+                tableName.getSchemaName(), tableName.getTableName(), 
serializedTable);
+    }
+
+    @Override
+    public ConnectorTableMetadata getTableMetadata(
+            ConnectorSession session, ConnectorTableHandle tableHandle) {
+        return ((TrinoTableHandle) tableHandle).tableMetadata();
+    }
+
+    @Override
+    public List<SchemaTableName> listTables(ConnectorSession session, 
Optional<String> schemaName) {
+        List<SchemaTableName> tables = new ArrayList<>();
+        schemaName
+                .map(Collections::singletonList)
+                .orElseGet(catalog::listDatabases)
+                .forEach(schema -> tables.addAll(listTables(schema)));
+        return tables;
+    }
+
+    private List<SchemaTableName> listTables(String schema) {
+        try {
+            return catalog.listTables(schema).stream()
+                    .map(table -> new SchemaTableName(schema, table))
+                    .collect(toList());
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void createTable(
+            ConnectorSession session,
+            ConnectorTableMetadata tableMetadata,
+            boolean ignoreExisting) {
+        SchemaTableName table = tableMetadata.getTable();
+        Identifier identifier = Identifier.create(table.getSchemaName(), 
table.getTableName());
+
+        try {
+            catalog.createTable(identifier, prepareSchema(tableMetadata), 
false);
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new RuntimeException(format("database not exists: '%s'", 
table.getSchemaName()));
+        } catch (Catalog.TableAlreadyExistException e) {
+            throw new RuntimeException(format("table already existed: '%s'", 
table.getTableName()));
+        }
+    }
+
+    private Schema prepareSchema(ConnectorTableMetadata tableMetadata) {
+        Map<String, Object> properties = new 
HashMap<>(tableMetadata.getProperties());
+        Schema.Builder builder =
+                Schema.newBuilder()
+                        
.primaryKey(TrinoTableOptions.getPrimaryKeys(properties))
+                        
.partitionKeys(TrinoTableOptions.getPartitionedKeys(properties));
+
+        for (ColumnMetadata column : tableMetadata.getColumns()) {
+            builder.column(
+                    column.getName(),
+                    TrinoTypeUtils.toPaimonType(column.getType()),
+                    column.getComment());
+        }
+
+        TrinoTableOptionUtils.buildOptions(builder, properties);
+
+        return builder.build();
+    }
+
+    @Override
+    public void renameTable(
+            ConnectorSession session,
+            ConnectorTableHandle tableHandle,
+            SchemaTableName newTableName) {
+        TrinoTableHandle oldTableHandle = (TrinoTableHandle) tableHandle;
+        try {
+            catalog.renameTable(
+                    new Identifier(oldTableHandle.getSchemaName(), 
oldTableHandle.getTableName()),
+                    new Identifier(newTableName.getSchemaName(), 
newTableName.getTableName()),
+                    false);
+        } catch (Catalog.TableNotExistException e) {
+            throw new RuntimeException(
+                    format("table not exists: '%s'", 
oldTableHandle.getTableName()));
+        } catch (Catalog.TableAlreadyExistException e) {
+            throw new RuntimeException(
+                    format("table already existed: '%s'", 
newTableName.getTableName()));
+        }
+    }
+
+    @Override
+    public void dropTable(ConnectorSession session, ConnectorTableHandle 
tableHandle) {
+        TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
+        try {
+            catalog.dropTable(
+                    new Identifier(
+                            trinoTableHandle.getSchemaName(), 
trinoTableHandle.getTableName()),
+                    false);
+        } catch (Catalog.TableNotExistException e) {
+            throw new RuntimeException(
+                    format("table not exists: '%s'", 
trinoTableHandle.getTableName()));
+        }
+    }
+
+    @Override
+    public Map<String, ColumnHandle> getColumnHandles(
+            ConnectorSession session, ConnectorTableHandle tableHandle) {
+        TrinoTableHandle table = (TrinoTableHandle) tableHandle;
+        Map<String, ColumnHandle> handleMap = new HashMap<>();
+        for (ColumnMetadata column : table.columnMetadatas()) {
+            handleMap.put(column.getName(), 
table.columnHandle(column.getName()));
+        }
+        return handleMap;
+    }
+
+    @Override
+    public ColumnMetadata getColumnMetadata(
+            ConnectorSession session, ConnectorTableHandle tableHandle, 
ColumnHandle columnHandle) {
+        return ((TrinoColumnHandle) columnHandle).getColumnMetadata();
+    }
+
+    @Override
+    public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(
+            ConnectorSession session, SchemaTablePrefix prefix) {
+        requireNonNull(prefix, "prefix is null");
+        List<SchemaTableName> tableNames;
+        if (prefix.getTable().isPresent()) {
+            tableNames = Collections.singletonList(prefix.toSchemaTableName());
+        } else {
+            tableNames = listTables(session, prefix.getSchema());
+        }
+
+        return tableNames.stream()
+                .collect(
+                        toMap(
+                                Function.identity(),
+                                table -> getTableHandle(session, 
table).columnMetadatas()));
+    }
+
+    @Override
+    public void addColumn(
+            ConnectorSession session, ConnectorTableHandle tableHandle, 
ColumnMetadata column) {
+        TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
+        Identifier identifier =
+                new Identifier(trinoTableHandle.getSchemaName(), 
trinoTableHandle.getTableName());
+        List<SchemaChange> changes = new ArrayList<>();
+        changes.add(
+                SchemaChange.addColumn(
+                        column.getName(), 
TrinoTypeUtils.toPaimonType(column.getType())));
+        try {
+            catalog.alterTable(identifier, changes, false);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    format("failed to alter table: '%s'", 
trinoTableHandle.getTableName()), e);
+        }
+    }
+
+    @Override
+    public void renameColumn(
+            ConnectorSession session,
+            ConnectorTableHandle tableHandle,
+            ColumnHandle source,
+            String target) {
+        TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
+        Identifier identifier =
+                new Identifier(trinoTableHandle.getSchemaName(), 
trinoTableHandle.getTableName());
+        TrinoColumnHandle trinoColumnHandle = (TrinoColumnHandle) source;
+        List<SchemaChange> changes = new ArrayList<>();
+        
changes.add(SchemaChange.renameColumn(trinoColumnHandle.getColumnName(), 
target));
+        try {
+            catalog.alterTable(identifier, changes, false);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    format("failed to alter table: '%s'", 
trinoTableHandle.getTableName()), e);
+        }
+    }
+
+    @Override
+    public void dropColumn(
+            ConnectorSession session, ConnectorTableHandle tableHandle, 
ColumnHandle column) {
+        TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
+        Identifier identifier =
+                new Identifier(trinoTableHandle.getSchemaName(), 
trinoTableHandle.getTableName());
+        TrinoColumnHandle trinoColumnHandle = (TrinoColumnHandle) column;
+        List<SchemaChange> changes = new ArrayList<>();
+        
changes.add(SchemaChange.dropColumn(trinoColumnHandle.getColumnName()));
+        try {
+            catalog.alterTable(identifier, changes, false);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    format("failed to alter table: '%s'", 
trinoTableHandle.getTableName()), e);
+        }
+    }
+
+    @Override
+    public Optional<ConstraintApplicationResult<ConnectorTableHandle>> 
applyFilter(
+            ConnectorSession session, ConnectorTableHandle handle, Constraint 
constraint) {
+        TrinoTableHandle trinoTableHandle = (TrinoTableHandle) handle;
+        TupleDomain<TrinoColumnHandle> oldFilter = 
trinoTableHandle.getFilter();
+        TupleDomain<TrinoColumnHandle> newFilter =
+                constraint
+                        .getSummary()
+                        .transformKeys(TrinoColumnHandle.class::cast)
+                        .intersect(oldFilter);
+        if (oldFilter.equals(newFilter)) {
+            return Optional.empty();
+        }
+
+        LinkedHashMap<TrinoColumnHandle, Domain> acceptedDomains = new 
LinkedHashMap<>();
+        LinkedHashMap<TrinoColumnHandle, Domain> unsupportedDomains = new 
LinkedHashMap<>();
+        new TrinoFilterConverter(trinoTableHandle.table().rowType())
+                .convert(newFilter, acceptedDomains, unsupportedDomains);
+
+        List<String> partitionKeys = trinoTableHandle.table().partitionKeys();
+        LinkedHashMap<TrinoColumnHandle, Domain> unenforcedDomains = new 
LinkedHashMap<>();
+        acceptedDomains.forEach(
+                (columnHandle, domain) -> {
+                    if (!partitionKeys.contains(columnHandle.getColumnName())) 
{
+                        unenforcedDomains.put(columnHandle, domain);
+                    }
+                });
+
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        TupleDomain<ColumnHandle> remain =
+                (TupleDomain)
+                        TupleDomain.withColumnDomains(unsupportedDomains)
+                                
.intersect(TupleDomain.withColumnDomains(unenforcedDomains));
+
+        return Optional.of(
+                new 
ConstraintApplicationResult<>(trinoTableHandle.copy(newFilter), remain, false));
+    }
+
+    @Override
+    public Optional<ProjectionApplicationResult<ConnectorTableHandle>> 
applyProjection(
+            ConnectorSession session,
+            ConnectorTableHandle handle,
+            List<ConnectorExpression> projections,
+            Map<String, ColumnHandle> assignments) {
+        TrinoTableHandle trinoTableHandle = (TrinoTableHandle) handle;
+        List<ColumnHandle> newColumns = new ArrayList<>(assignments.values());
+
+        if (trinoTableHandle.getProjectedColumns().isPresent()
+                && containSameElements(newColumns, 
trinoTableHandle.getProjectedColumns().get())) {
+            return Optional.empty();
+        }
+
+        List<Assignment> assignmentList = new ArrayList<>();
+        assignments.forEach(
+                (name, column) ->
+                        assignmentList.add(
+                                new Assignment(
+                                        name,
+                                        column,
+                                        ((TrinoColumnHandle) 
column).getTrinoType())));
+
+        return Optional.of(
+                new ProjectionApplicationResult<>(
+                        trinoTableHandle.copy(Optional.of(newColumns)),
+                        projections,
+                        assignmentList,
+                        false));
+    }
+
+    private static boolean containSameElements(
+            List<? extends ColumnHandle> first, List<? extends ColumnHandle> 
second) {
+        return new HashSet<>(first).equals(new HashSet<>(second));
+    }
+
+    @Override
+    public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(
+            ConnectorSession session, ConnectorTableHandle handle, long limit) 
{
+        TrinoTableHandle table = (TrinoTableHandle) handle;
+
+        if (table.getLimit().isPresent() && table.getLimit().getAsLong() <= 
limit) {
+            return Optional.empty();
+        }
+
+        if (!table.getFilter().isAll()) {
+            LinkedHashMap<TrinoColumnHandle, Domain> acceptedDomains = new 
LinkedHashMap<>();
+            LinkedHashMap<TrinoColumnHandle, Domain> unsupportedDomains = new 
LinkedHashMap<>();
+            new TrinoFilterConverter(table.table().rowType())
+                    .convert(table.getFilter(), acceptedDomains, 
unsupportedDomains);
+            Set<String> acceptedFields =
+                    acceptedDomains.keySet().stream()
+                            .map(TrinoColumnHandle::getColumnName)
+                            .collect(Collectors.toSet());
+            if (unsupportedDomains.size() > 0
+                    || 
!table.table().partitionKeys().containsAll(acceptedFields)) {
+                return Optional.empty();
+            }
+        }
+
+        table = table.copy(OptionalLong.of(limit));
+
+        return Optional.of(new LimitApplicationResult<>(table, false, false));
+    }
+
+    @Override
+    public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> 
applyTableFunction(
+            ConnectorSession session, ConnectorTableFunctionHandle handle) {
+        return Optional.of(
+                new TableFunctionApplicationResult(
+                        handle, ((TrinoTableHandle) 
handle).getProjectedColumns().get()));
+    }
+}
diff --git 
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoModule.java 
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoModule.java
new file mode 100644
index 0000000..b356b71
--- /dev/null
+++ b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoModule.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.options.Options;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+import io.trino.spi.ptf.ConnectorTableFunction;
+
+import java.util.Map;
+
+import static com.google.inject.Scopes.SINGLETON;
+import static com.google.inject.multibindings.Multibinder.newSetBinder;
+
+/** Module for binding instance. */
+public class TrinoModule implements Module {
+    private Map<String, String> config;
+
+    public TrinoModule(Map<String, String> config) {
+        this.config = config;
+    }
+
+    @Override
+    public void configure(Binder binder) {
+        binder.bind(Options.class).toInstance(new Options(config));
+        binder.bind(TrinoMetadataFactory.class).in(SINGLETON);
+        binder.bind(TrinoSplitManager.class).in(SINGLETON);
+        binder.bind(TrinoPageSourceProvider.class).in(SINGLETON);
+        binder.bind(TrinoSessionProperties.class).in(SINGLETON);
+        binder.bind(TrinoTableOptions.class).in(SINGLETON);
+        newSetBinder(binder, ConnectorTableFunction.class)
+                .addBinding()
+                .toProvider(TableChangesFunctionProvider.class)
+                .in(Scopes.SINGLETON);
+    }
+}
diff --git 
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoSplitManagerBase.java
 
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoSplitManagerBase.java
new file mode 100644
index 0000000..61a26d2
--- /dev/null
+++ 
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoSplitManagerBase.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplitManager;
+import io.trino.spi.connector.ConnectorSplitSource;
+import io.trino.spi.connector.ConnectorTableHandle;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Trino {@link ConnectorSplitManager}. */
+public abstract class TrinoSplitManagerBase implements ConnectorSplitManager {
+
+    protected ConnectorSplitSource getSplits(
+            ConnectorTableHandle connectorTableHandle, ConnectorSession 
session) {
+        // TODO dynamicFilter?
+        // TODO what is constraint?
+
+        TrinoTableHandle tableHandle = (TrinoTableHandle) connectorTableHandle;
+        Table table = tableHandle.tableWithDynamicOptions(session);
+        ReadBuilder readBuilder = table.newReadBuilder();
+        new TrinoFilterConverter(table.rowType())
+                .convert(tableHandle.getFilter())
+                .ifPresent(readBuilder::withFilter);
+        tableHandle.getLimit().ifPresent(limit -> readBuilder.withLimit((int) 
limit));
+        List<Split> splits = readBuilder.newScan().plan().splits();
+
+        long maxRowCount = 
splits.stream().mapToLong(Split::rowCount).max().orElse(0L);
+        double minimumSplitWeight = 
TrinoSessionProperties.getMinimumSplitWeight(session);
+        return new TrinoSplitSource(
+                splits.stream()
+                        .map(
+                                split ->
+                                        TrinoSplit.fromSplit(
+                                                split,
+                                                Math.min(
+                                                        Math.max(
+                                                                (double) 
split.rowCount()
+                                                                        / 
maxRowCount,
+                                                                
minimumSplitWeight),
+                                                        1.0)))
+                        .collect(Collectors.toList()));
+    }
+}
diff --git 
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java 
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
new file mode 100644
index 0000000..a9d0a6d
--- /dev/null
+++ 
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.ptf.ConnectorTableFunctionHandle;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+
+/** Trino {@link ConnectorTableHandle}. */
+public final class TrinoTableHandle implements ConnectorTableHandle, 
ConnectorTableFunctionHandle {
+
+    private final String schemaName;
+    private final String tableName;
+    private final byte[] serializedTable;
+    private final TupleDomain<TrinoColumnHandle> filter;
+    private final Optional<List<ColumnHandle>> projectedColumns;
+    private final OptionalLong limit;
+
+    private Table lazyTable;
+
+    public TrinoTableHandle(String schemaName, String tableName, byte[] 
serializedTable) {
+        this(
+                schemaName,
+                tableName,
+                serializedTable,
+                TupleDomain.all(),
+                Optional.empty(),
+                OptionalLong.empty());
+    }
+
+    @JsonCreator
+    public TrinoTableHandle(
+            @JsonProperty("schemaName") String schemaName,
+            @JsonProperty("tableName") String tableName,
+            @JsonProperty("serializedTable") byte[] serializedTable,
+            @JsonProperty("filter") TupleDomain<TrinoColumnHandle> filter,
+            @JsonProperty("projection") Optional<List<ColumnHandle>> 
projectedColumns,
+            @JsonProperty("limit") OptionalLong limit) {
+        this.schemaName = schemaName;
+        this.tableName = tableName;
+        this.serializedTable = serializedTable;
+        this.filter = filter;
+        this.projectedColumns = projectedColumns;
+        this.limit = limit;
+    }
+
+    @JsonProperty
+    public String getSchemaName() {
+        return schemaName;
+    }
+
+    @JsonProperty
+    public String getTableName() {
+        return tableName;
+    }
+
+    @JsonProperty
+    public byte[] getSerializedTable() {
+        return serializedTable;
+    }
+
+    @JsonProperty
+    public TupleDomain<TrinoColumnHandle> getFilter() {
+        return filter;
+    }
+
+    @JsonProperty
+    public Optional<List<ColumnHandle>> getProjectedColumns() {
+        return projectedColumns;
+    }
+
+    public OptionalLong getLimit() {
+        return limit;
+    }
+
+    public TrinoTableHandle copy(TupleDomain<TrinoColumnHandle> filter) {
+        return new TrinoTableHandle(
+                schemaName, tableName, serializedTable, filter, 
projectedColumns, limit);
+    }
+
+    public TrinoTableHandle copy(Optional<List<ColumnHandle>> 
projectedColumns) {
+        return new TrinoTableHandle(
+                schemaName, tableName, serializedTable, filter, 
projectedColumns, limit);
+    }
+
+    public TrinoTableHandle copy(OptionalLong limit) {
+        return new TrinoTableHandle(
+                schemaName, tableName, serializedTable, filter, 
projectedColumns, limit);
+    }
+
+    public Table tableWithDynamicOptions(ConnectorSession session) {
+        // see TrinoConnector.getSessionProperties
+        Map<String, String> dynamicOptions = new HashMap<>();
+        Long scanTimestampMills = 
TrinoSessionProperties.getScanTimestampMillis(session);
+        if (scanTimestampMills != null) {
+            dynamicOptions.put(
+                    CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), 
scanTimestampMills.toString());
+        }
+        Long scanSnapshotId = 
TrinoSessionProperties.getScanSnapshotId(session);
+        if (scanSnapshotId != null) {
+            dynamicOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), 
scanSnapshotId.toString());
+        }
+
+        return dynamicOptions.size() > 0 ? table().copy(dynamicOptions) : 
table();
+    }
+
+    public Table table() {
+        if (lazyTable == null) {
+            try {
+                lazyTable =
+                        InstantiationUtil.deserializeObject(
+                                serializedTable, 
this.getClass().getClassLoader());
+            } catch (IOException | ClassNotFoundException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return lazyTable;
+    }
+
+    public ConnectorTableMetadata tableMetadata() {
+        return new ConnectorTableMetadata(
+                SchemaTableName.schemaTableName(schemaName, tableName),
+                columnMetadatas(),
+                Collections.emptyMap(),
+                Optional.empty());
+    }
+
+    public List<ColumnMetadata> columnMetadatas() {
+        return table().rowType().getFields().stream()
+                .map(
+                        column ->
+                                ColumnMetadata.builder()
+                                        .setName(column.name())
+                                        
.setType(TrinoTypeUtils.fromPaimonType(column.type()))
+                                        
.setNullable(column.type().isNullable())
+                                        
.setComment(Optional.ofNullable(column.description()))
+                                        .build())
+                .collect(Collectors.toList());
+    }
+
+    public TrinoColumnHandle columnHandle(String field) {
+        List<String> fieldNames = FieldNameUtils.fieldNames(table().rowType());
+        int index = fieldNames.indexOf(field);
+        if (index == -1) {
+            throw new RuntimeException(
+                    String.format("Cannot find field %s in schema %s", field, 
fieldNames));
+        }
+        return TrinoColumnHandle.of(field, table().rowType().getTypeAt(index));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TrinoTableHandle that = (TrinoTableHandle) o;
+        return Arrays.equals(serializedTable, that.serializedTable)
+                && Objects.equals(schemaName, that.schemaName)
+                && Objects.equals(tableName, that.tableName)
+                && Objects.equals(filter, that.filter)
+                && Objects.equals(projectedColumns, that.projectedColumns);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                schemaName, tableName, filter, projectedColumns, 
Arrays.hashCode(serializedTable));
+    }
+}
diff --git 
a/paimon-trino-388/src/test/java/org/apache/paimon/trino/TestTrino388ITCase.java
 
b/paimon-trino-388/src/test/java/org/apache/paimon/trino/TestTrino388ITCase.java
index decefcf..073b4bd 100644
--- 
a/paimon-trino-388/src/test/java/org/apache/paimon/trino/TestTrino388ITCase.java
+++ 
b/paimon-trino-388/src/test/java/org/apache/paimon/trino/TestTrino388ITCase.java
@@ -18,9 +18,35 @@
 
 package org.apache.paimon.trino;
 
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
 /** {@link TestTrinoITCase} for Trino 388. */
 public class TestTrino388ITCase extends TestTrinoITCase {
     public TestTrino388ITCase() {
         super(388);
     }
+
+    @Test
+    public void testIncrementalRead() {
+        assertThatExceptionOfType(RuntimeException.class)
+                .isThrownBy(
+                        () ->
+                                sql(
+                                        "SELECT * FROM 
TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2'))"))
+                .withMessage(
+                        "Either INCREMENTAL_BETWEEN or 
INCREMENTAL_BETWEEN_TIMESTAMP must be provided");
+        assertThat(
+                        sql(
+                                "SELECT * FROM 
TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2',incremental_between=>'1,2'))"))
+                .isEqualTo("[[5, 6, 3, 3], [7, 8, 4, 4]]");
+        assertThat(
+                        sql(
+                                String.format(
+                                        "SELECT * FROM 
TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2',incremental_between_timestamp=>'%s,%s'))",
+                                        t2FirstCommitTimestamp, 
System.currentTimeMillis())))
+                .isEqualTo("[[5, 6, 3, 3], [7, 8, 4, 4]]");
+    }
 }
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
index bcde932..b22e3b1 100644
--- 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
@@ -125,7 +125,7 @@ public class TrinoTableOptionUtils {
         }
     }
 
-    private static String convertOptionKey(String key) {
+    public static String convertOptionKey(String key) {
         String regex = "[.\\-]";
         Pattern pattern = Pattern.compile(regex);
         Matcher matcher = pattern.matcher(key);

Reply via email to