This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git
The following commit(s) were added to refs/heads/main by this push:
new 5697da8 Support incremental snapshots reading (#97)
5697da8 is described below
commit 5697da8de19a60e52842fd86d612c99e35aa0877
Author: rfyu <[email protected]>
AuthorDate: Mon Jun 16 16:47:17 2025 +0800
Support incremental snapshots reading (#97)
---
pom.xml | 2 +-
.../org/apache/paimon/trino/TrinoConnector.java | 25 ++-
.../apache/paimon/trino/TrinoConnectorFactory.java | 11 +-
.../java/org/apache/paimon/trino/TrinoModule.java | 13 ++
.../org/apache/paimon/trino/TrinoSplitManager.java | 19 +-
.../org/apache/paimon/trino/TrinoTableHandle.java | 8 +-
.../apache/paimon/trino/TrinoTableOptionUtils.java | 2 +-
.../apache/paimon/trino/catalog/TrinoCatalog.java | 26 +--
.../trino/functions/TrinoFunctionProvider.java | 50 +++++
.../tablechanges/TableChangesFunction.java | 231 +++++++++++++++++++++
.../TableChangesFunctionProcessor.java | 69 ++++++
.../TableChangesFunctionProcessorProvider.java | 55 +++++
.../tablechanges/TableChangesFunctionProvider.java | 45 ++++
.../java/org/apache/paimon/trino/TrinoITCase.java | 28 +++
14 files changed, 561 insertions(+), 23 deletions(-)
diff --git a/pom.xml b/pom.xml
index 39ed2db..e57fda0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@ under the License.
</scm>
<properties>
- <paimon.version>1.0-SNAPSHOT</paimon.version>
+ <paimon.version>1.0.0</paimon.version>
<target.java.version>21</target.java.version>
<jdk.test.version>21</jdk.test.version>
<trino.version>440</trino.version>
diff --git a/src/main/java/org/apache/paimon/trino/TrinoConnector.java
b/src/main/java/org/apache/paimon/trino/TrinoConnector.java
index dff66e5..36824cf 100644
--- a/src/main/java/org/apache/paimon/trino/TrinoConnector.java
+++ b/src/main/java/org/apache/paimon/trino/TrinoConnector.java
@@ -18,6 +18,8 @@
package org.apache.paimon.trino;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;
+
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
@@ -26,10 +28,14 @@ import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.function.FunctionProvider;
+import io.trino.spi.function.table.ConnectorTableFunction;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.transaction.IsolationLevel;
import java.util.List;
+import java.util.Optional;
+import java.util.Set;
import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED;
import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
@@ -44,6 +50,8 @@ public class TrinoConnector implements Connector {
private final ConnectorNodePartitioningProvider
trinoNodePartitioningProvider;
private final List<PropertyMetadata<?>> tableProperties;
private final List<PropertyMetadata<?>> sessionProperties;
+ private final Set<ConnectorTableFunction> tableFunctions;
+ private final FunctionProvider functionProvider;
public TrinoConnector(
ConnectorMetadata trinoMetadata,
@@ -52,7 +60,9 @@ public class TrinoConnector implements Connector {
ConnectorPageSinkProvider trinoPageSinkProvider,
ConnectorNodePartitioningProvider trinoNodePartitioningProvider,
TrinoTableOptions trinoTableOptions,
- TrinoSessionProperties trinoSessionProperties) {
+ TrinoSessionProperties trinoSessionProperties,
+ Set<ConnectorTableFunction> tableFunctions,
+ FunctionProvider functionProvider) {
this.trinoMetadata = requireNonNull(trinoMetadata, "trinoMetadata is
null");
this.trinoSplitManager = requireNonNull(trinoSplitManager,
"trinoSplitManager is null");
this.trinoPageSourceProvider =
@@ -64,6 +74,9 @@ public class TrinoConnector implements Connector {
trinoNodePartitioningProvider,
"trinoNodePartitioningProvider is null");
this.tableProperties = trinoTableOptions.getTableProperties();
this.sessionProperties = trinoSessionProperties.getSessionProperties();
+ this.tableFunctions =
+ ImmutableSet.copyOf(requireNonNull(tableFunctions,
"tableFunctions is null"));
+ this.functionProvider = requireNonNull(functionProvider,
"functionProvider is null");
}
@Override
@@ -108,4 +121,14 @@ public class TrinoConnector implements Connector {
public ConnectorNodePartitioningProvider getNodePartitioningProvider() {
return trinoNodePartitioningProvider;
}
+
+ @Override
+ public Set<ConnectorTableFunction> getTableFunctions() {
+ return tableFunctions;
+ }
+
+ @Override
+ public Optional<FunctionProvider> getFunctionProvider() {
+ return Optional.of(functionProvider);
+ }
}
diff --git a/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
b/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
index 4327699..2f244b1 100644
--- a/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
+++ b/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
@@ -22,6 +22,7 @@ import org.apache.paimon.utils.StringUtils;
import com.google.inject.Binder;
import com.google.inject.Injector;
+import com.google.inject.Key;
import com.google.inject.Module;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.json.JsonModule;
@@ -40,6 +41,8 @@ import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorContext;
import io.trino.spi.connector.ConnectorFactory;
+import io.trino.spi.function.FunctionProvider;
+import io.trino.spi.function.table.ConnectorTableFunction;
import io.trino.spi.type.TypeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +57,7 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
/** Trino {@link ConnectorFactory}. */
public class TrinoConnectorFactory implements ConnectorFactory {
@@ -137,6 +141,9 @@ public class TrinoConnectorFactory implements
ConnectorFactory {
TrinoSessionProperties trinoSessionProperties =
injector.getInstance(TrinoSessionProperties.class);
TrinoTableOptions trinoTableOptions =
injector.getInstance(TrinoTableOptions.class);
+ Set<ConnectorTableFunction> connectorTableFunctions =
+ injector.getInstance(new Key<>() {});
+ FunctionProvider functionProvider =
injector.getInstance(FunctionProvider.class);
return new TrinoConnector(
new ClassLoaderSafeConnectorMetadata(trinoMetadata,
classLoader),
@@ -147,7 +154,9 @@ public class TrinoConnectorFactory implements
ConnectorFactory {
trinoPageSinkProvider, classLoader),
trinoNodePartitioningProvider,
trinoTableOptions,
- trinoSessionProperties);
+ trinoSessionProperties,
+ connectorTableFunctions,
+ functionProvider);
}
}
diff --git a/src/main/java/org/apache/paimon/trino/TrinoModule.java
b/src/main/java/org/apache/paimon/trino/TrinoModule.java
index f629736..cbdb74e 100644
--- a/src/main/java/org/apache/paimon/trino/TrinoModule.java
+++ b/src/main/java/org/apache/paimon/trino/TrinoModule.java
@@ -19,13 +19,20 @@
package org.apache.paimon.trino;
import org.apache.paimon.options.Options;
+import org.apache.paimon.trino.functions.TrinoFunctionProvider;
+import
org.apache.paimon.trino.functions.tablechanges.TableChangesFunctionProcessorProvider;
+import
org.apache.paimon.trino.functions.tablechanges.TableChangesFunctionProvider;
import com.google.inject.Binder;
import com.google.inject.Module;
+import com.google.inject.Scopes;
+import io.trino.spi.function.FunctionProvider;
+import io.trino.spi.function.table.ConnectorTableFunction;
import java.util.Map;
import static com.google.inject.Scopes.SINGLETON;
+import static com.google.inject.multibindings.Multibinder.newSetBinder;
/** Module for binding instance. */
public class TrinoModule implements Module {
@@ -45,5 +52,11 @@ public class TrinoModule implements Module {
binder.bind(TrinoNodePartitioningProvider.class).in(SINGLETON);
binder.bind(TrinoSessionProperties.class).in(SINGLETON);
binder.bind(TrinoTableOptions.class).in(SINGLETON);
+ newSetBinder(binder, ConnectorTableFunction.class)
+ .addBinding()
+ .toProvider(TableChangesFunctionProvider.class)
+ .in(Scopes.SINGLETON);
+
binder.bind(FunctionProvider.class).to(TrinoFunctionProvider.class).in(Scopes.SINGLETON);
+ binder.bind(TableChangesFunctionProcessorProvider.class).in(SINGLETON);
}
}
diff --git a/src/main/java/org/apache/paimon/trino/TrinoSplitManager.java
b/src/main/java/org/apache/paimon/trino/TrinoSplitManager.java
index 120b0e9..35b402f 100644
--- a/src/main/java/org/apache/paimon/trino/TrinoSplitManager.java
+++ b/src/main/java/org/apache/paimon/trino/TrinoSplitManager.java
@@ -31,6 +31,7 @@ import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
+import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import java.util.List;
import java.util.stream.Collectors;
@@ -57,15 +58,25 @@ public class TrinoSplitManager implements
ConnectorSplitManager {
ConnectorTableHandle table,
DynamicFilter dynamicFilter,
Constraint constraint) {
- return getSplits(table, session);
+ return getSplits((TrinoTableHandle) table, session);
+ }
+
+ @Override
+ public ConnectorSplitSource getSplits(
+ ConnectorTransactionHandle transaction,
+ ConnectorSession session,
+ ConnectorTableFunctionHandle function) {
+ if (function instanceof TrinoTableHandle) {
+ return getSplits((TrinoTableHandle) function, session);
+ }
+ throw new IllegalStateException("Unknown table function: " + function);
}
protected ConnectorSplitSource getSplits(
- ConnectorTableHandle connectorTableHandle, ConnectorSession
session) {
+ TrinoTableHandle tableHandle, ConnectorSession session) {
// TODO dynamicFilter?
// TODO what is constraint?
- TrinoTableHandle tableHandle = (TrinoTableHandle) connectorTableHandle;
Table table = tableHandle.tableWithDynamicOptions(trinoCatalog,
session);
ReadBuilder readBuilder = table.newReadBuilder();
new TrinoFilterConverter(table.rowType())
@@ -89,6 +100,6 @@ public class TrinoSplitManager implements
ConnectorSplitManager {
minimumSplitWeight),
1.0)))
.collect(Collectors.toList()),
- ((TrinoTableHandle) connectorTableHandle).getLimit());
+ tableHandle.getLimit());
}
}
diff --git a/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
b/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
index d5dbf44..163c6a1 100644
--- a/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
+++ b/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
@@ -34,6 +34,7 @@ import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import io.trino.spi.predicate.TupleDomain;
import java.util.Collections;
@@ -47,7 +48,10 @@ import java.util.stream.Collectors;
/** Trino {@link ConnectorTableHandle}. */
public class TrinoTableHandle
- implements ConnectorTableHandle, ConnectorInsertTableHandle,
ConnectorOutputTableHandle {
+ implements ConnectorTableHandle,
+ ConnectorInsertTableHandle,
+ ConnectorOutputTableHandle,
+ ConnectorTableFunctionHandle {
private final String schemaName;
private final String tableName;
@@ -75,7 +79,7 @@ public class TrinoTableHandle
@JsonProperty("tableName") String tableName,
@JsonProperty("dynamicOptions") Map<String, String> dynamicOptions,
@JsonProperty("filter") TupleDomain<TrinoColumnHandle> filter,
- @JsonProperty("projection") Optional<List<ColumnHandle>>
projectedColumns,
+ @JsonProperty("projectedColumns") Optional<List<ColumnHandle>>
projectedColumns,
@JsonProperty("limit") OptionalLong limit) {
this.schemaName = schemaName;
this.tableName = tableName;
diff --git a/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
b/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
index 3504fb5..7eca2f6 100644
--- a/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
+++ b/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
@@ -124,7 +124,7 @@ public class TrinoTableOptionUtils {
}
}
- private static String convertOptionKey(String key) {
+ public static String convertOptionKey(String key) {
String regex = "[.\\-]";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(key);
diff --git a/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java
b/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java
index debafbc..b6e04c8 100644
--- a/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java
+++ b/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java
@@ -23,10 +23,10 @@ import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.security.SecurityContext;
@@ -109,6 +109,11 @@ public class TrinoCatalog implements Catalog {
return current.options();
}
+ @Override
+ public boolean caseSensitive() {
+ return current.caseSensitive();
+ }
+
@Override
public FileIO fileIO() {
if (!inited) {
@@ -140,13 +145,14 @@ public class TrinoCatalog implements Catalog {
}
@Override
- public Table getTable(Identifier identifier) throws TableNotExistException
{
- return current.getTable(identifier);
+ public void alterDatabase(String s, List<PropertyChange> list, boolean b)
+ throws DatabaseNotExistException {
+ current.alterDatabase(s, list, b);
}
@Override
- public Path getTableLocation(Identifier identifier) {
- return current.getTableLocation(identifier);
+ public Table getTable(Identifier identifier) throws TableNotExistException
{
+ return current.getTable(identifier);
}
@Override
@@ -190,8 +196,7 @@ public class TrinoCatalog implements Catalog {
}
@Override
- public List<PartitionEntry> listPartitions(Identifier identifier)
- throws TableNotExistException {
+ public List<Partition> listPartitions(Identifier identifier) throws
TableNotExistException {
return current.listPartitions(identifier);
}
@@ -213,9 +218,4 @@ public class TrinoCatalog implements Catalog {
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
current.alterTable(identifier, change, ignoreIfNotExists);
}
-
- @Override
- public boolean allowUpperCase() {
- return current.allowUpperCase();
- }
}
diff --git
a/src/main/java/org/apache/paimon/trino/functions/TrinoFunctionProvider.java
b/src/main/java/org/apache/paimon/trino/functions/TrinoFunctionProvider.java
new file mode 100644
index 0000000..4ced19f
--- /dev/null
+++ b/src/main/java/org/apache/paimon/trino/functions/TrinoFunctionProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino.functions;
+
+import org.apache.paimon.trino.TrinoTableHandle;
+import
org.apache.paimon.trino.functions.tablechanges.TableChangesFunctionProcessorProvider;
+
+import com.google.inject.Inject;
+import
io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionProcessorProvider;
+import io.trino.spi.function.FunctionProvider;
+import io.trino.spi.function.table.ConnectorTableFunctionHandle;
+import io.trino.spi.function.table.TableFunctionProcessorProvider;
+
+/** TrinoFunctionProvider. */
+public class TrinoFunctionProvider implements FunctionProvider {
+
+ private final TableChangesFunctionProcessorProvider
tableChangesFunctionProcessorProvider;
+
+ @Inject
+ public TrinoFunctionProvider(
+ TableChangesFunctionProcessorProvider
tableChangesFunctionProcessorProvider) {
+ this.tableChangesFunctionProcessorProvider =
tableChangesFunctionProcessorProvider;
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getTableFunctionProcessorProvider(
+ ConnectorTableFunctionHandle functionHandle) {
+ if (functionHandle instanceof TrinoTableHandle) {
+ return new ClassLoaderSafeTableFunctionProcessorProvider(
+ tableChangesFunctionProcessorProvider,
getClass().getClassLoader());
+ }
+ return
FunctionProvider.super.getTableFunctionProcessorProvider(functionHandle);
+ }
+}
diff --git
a/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunction.java
b/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunction.java
new file mode 100644
index 0000000..7c1ef88
--- /dev/null
+++
b/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunction.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino.functions.tablechanges;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.trino.TrinoColumnHandle;
+import org.apache.paimon.trino.TrinoMetadata;
+import org.apache.paimon.trino.TrinoMetadataFactory;
+import org.apache.paimon.trino.TrinoTableHandle;
+import org.apache.paimon.trino.TrinoTableOptionUtils;
+import org.apache.paimon.trino.TrinoTypeUtils;
+import org.apache.paimon.trino.catalog.TrinoCatalog;
+
+import com.google.inject.Inject;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ConnectorAccessControl;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.function.table.AbstractConnectorTableFunction;
+import io.trino.spi.function.table.Argument;
+import io.trino.spi.function.table.Descriptor;
+import io.trino.spi.function.table.ScalarArgument;
+import io.trino.spi.function.table.ScalarArgumentSpecification;
+import io.trino.spi.function.table.TableFunctionAnalysis;
+import io.trino.spi.predicate.TupleDomain;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
+import static
io.trino.spi.function.table.ReturnTypeSpecification.GenericTable.GENERIC_TABLE;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static java.util.Locale.ENGLISH;
+import static java.util.Objects.requireNonNull;
+
+/** TableChangesFunction. */
+public class TableChangesFunction extends AbstractConnectorTableFunction {
+
+ private static final Slice INVALID_VALUE = Slices.utf8Slice("invalid");
+ private static final String FUNCTION_NAME = "table_changes";
+ private static final String SCHEMA_NAME_VAR_NAME = "SCHEMA_NAME";
+ private static final String TABLE_NAME_VAR_NAME = "TABLE_NAME";
+ private static final String INCREMENTAL_BETWEEN_SCAN_MODE =
+
TrinoTableOptionUtils.convertOptionKey(CoreOptions.INCREMENTAL_BETWEEN_SCAN_MODE.key())
+ .toUpperCase(ENGLISH);
+ private static final String INCREMENTAL_BETWEEN_TIMESTAMP =
+
TrinoTableOptionUtils.convertOptionKey(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key())
+ .toUpperCase(ENGLISH);
+ private static final String INCREMENTAL_BETWEEN =
+
TrinoTableOptionUtils.convertOptionKey(CoreOptions.INCREMENTAL_BETWEEN.key())
+ .toUpperCase(ENGLISH);
+ private final TrinoMetadata trinoMetadata;
+
+ @Inject
+ public TableChangesFunction(TrinoMetadataFactory trinoMetadataFactory) {
+ super(
+ "system",
+ FUNCTION_NAME,
+ ImmutableList.of(
+ ScalarArgumentSpecification.builder()
+ .name(SCHEMA_NAME_VAR_NAME)
+ .type(VARCHAR)
+ .build(),
+ ScalarArgumentSpecification.builder()
+ .name(TABLE_NAME_VAR_NAME)
+ .type(VARCHAR)
+ .build(),
+ ScalarArgumentSpecification.builder()
+ .name(INCREMENTAL_BETWEEN_SCAN_MODE)
+ .defaultValue(
+ Slices.utf8Slice(
+
CoreOptions.INCREMENTAL_BETWEEN_SCAN_MODE
+ .defaultValue()
+ .getValue()))
+ .type(VARCHAR)
+ .build(),
+ ScalarArgumentSpecification.builder()
+ .name(INCREMENTAL_BETWEEN)
+ .defaultValue(INVALID_VALUE)
+ .type(VARCHAR)
+ .build(),
+ ScalarArgumentSpecification.builder()
+ .name(INCREMENTAL_BETWEEN_TIMESTAMP)
+ .defaultValue(INVALID_VALUE)
+ .type(VARCHAR)
+ .build()),
+ GENERIC_TABLE);
+ this.trinoMetadata =
+ requireNonNull(trinoMetadataFactory, "trinoMetadataFactory is
null").create();
+ }
+
+ @Override
+ public TableFunctionAnalysis analyze(
+ ConnectorSession session,
+ ConnectorTransactionHandle transaction,
+ Map<String, Argument> arguments,
+ ConnectorAccessControl accessControl) {
+ String schema = getSchemaName(arguments);
+ String table = getTableName(arguments);
+
+ Slice incrementalBetweenValue =
+ (Slice) ((ScalarArgument)
arguments.get(INCREMENTAL_BETWEEN)).getValue();
+ Slice incrementalBetweenTimestamp =
+ (Slice) ((ScalarArgument)
arguments.get(INCREMENTAL_BETWEEN_TIMESTAMP)).getValue();
+ if (incrementalBetweenValue.equals(INVALID_VALUE)
+ && incrementalBetweenTimestamp.equals(INVALID_VALUE)) {
+ throw new TrinoException(
+ INVALID_FUNCTION_ARGUMENT,
+ "Either "
+ + INCREMENTAL_BETWEEN
+ + " or "
+ + INCREMENTAL_BETWEEN_TIMESTAMP
+ + " must be provided");
+ }
+
+ SchemaTableName schemaTableName = new SchemaTableName(schema, table);
+ try {
+ TrinoCatalog catalog = trinoMetadata.catalog();
+ catalog.initSession(session);
+ Table paimonTable = catalog.getTable(Identifier.create(schema,
table));
+ Map<String, String> options = new HashMap<>(paimonTable.options());
+ if (!incrementalBetweenValue.equals(INVALID_VALUE)) {
+ options.put(
+ CoreOptions.INCREMENTAL_BETWEEN.key(),
+ incrementalBetweenValue.toStringUtf8());
+ }
+ if (!incrementalBetweenTimestamp.equals(INVALID_VALUE)) {
+ options.put(
+ CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key(),
+ incrementalBetweenTimestamp.toStringUtf8());
+ }
+
+ ImmutableList.Builder<Descriptor.Field> columns =
ImmutableList.builder();
+ List<ColumnHandle> projectedColumns = new ArrayList<>();
+ paimonTable.rowType().getFields().stream()
+ .forEach(
+ column -> {
+ columns.add(
+ new Descriptor.Field(
+ column.name(),
+ Optional.of(
+
TrinoTypeUtils.fromPaimonType(
+
column.type()))));
+ projectedColumns.add(
+ TrinoColumnHandle.of(column.name(),
column.type()));
+ });
+ return TableFunctionAnalysis.builder()
+ .returnedType(new Descriptor(columns.build()))
+ .handle(
+ new TrinoTableHandle(
+ schema,
+ table,
+ options,
+ TupleDomain.all(),
+ Optional.of(projectedColumns),
+ OptionalLong.empty()))
+ .build();
+ } catch (Catalog.TableNotExistException e) {
+ throw new TrinoException(
+ INVALID_FUNCTION_ARGUMENT, "Table not found: " +
schemaTableName);
+ }
+ }
+
+ private static String getSchemaName(Map<String, Argument> arguments) {
+ if (argumentExists(arguments, SCHEMA_NAME_VAR_NAME)) {
+ return ((Slice)
+ checkNonNull(
+ ((ScalarArgument)
arguments.get(SCHEMA_NAME_VAR_NAME))
+ .getValue()))
+ .toStringUtf8();
+ }
+ throw new TrinoException(
+ INVALID_FUNCTION_ARGUMENT, SCHEMA_NAME_VAR_NAME + " argument
not found");
+ }
+
+ private static String getTableName(Map<String, Argument> arguments) {
+ if (argumentExists(arguments, TABLE_NAME_VAR_NAME)) {
+ return ((Slice)
+ checkNonNull(
+ ((ScalarArgument)
arguments.get(TABLE_NAME_VAR_NAME))
+ .getValue()))
+ .toStringUtf8();
+ }
+ throw new TrinoException(
+ INVALID_FUNCTION_ARGUMENT, TABLE_NAME_VAR_NAME + " argument
not found");
+ }
+
+ private static boolean argumentExists(Map<String, Argument> arguments,
String key) {
+ Argument argument = arguments.get(key);
+ if (argument instanceof ScalarArgument) {
+ return !((ScalarArgument) argument).getNullableValue().isNull();
+ }
+ throw new IllegalArgumentException("Unsupported argument type: " +
argument);
+ }
+
+ private static Object checkNonNull(Object argumentValue) {
+ if (argumentValue == null) {
+ throw new TrinoException(
+ INVALID_FUNCTION_ARGUMENT, FUNCTION_NAME + " arguments may
not be null");
+ }
+ return argumentValue;
+ }
+}
diff --git
a/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProcessor.java
b/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProcessor.java
new file mode 100644
index 0000000..4acdb57
--- /dev/null
+++
b/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProcessor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino.functions.tablechanges;
+
+import org.apache.paimon.trino.TrinoPageSourceProvider;
+import org.apache.paimon.trino.TrinoSplit;
+import org.apache.paimon.trino.TrinoTableHandle;
+
+import io.trino.spi.Page;
+import io.trino.spi.connector.ConnectorPageSource;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.DynamicFilter;
+import io.trino.spi.function.table.TableFunctionProcessorState;
+import io.trino.spi.function.table.TableFunctionSplitProcessor;
+
+import static
io.trino.spi.function.table.TableFunctionProcessorState.Finished.FINISHED;
+
+/** TableChangesFunctionProcessor. */
+public class TableChangesFunctionProcessor implements
TableFunctionSplitProcessor {
+
+ private static final Page EMPTY_PAGE = new Page(0);
+
+ private final ConnectorPageSource pageSource;
+
+ public TableChangesFunctionProcessor(
+ ConnectorSession session,
+ TrinoTableHandle handle,
+ TrinoSplit split,
+ TrinoPageSourceProvider pageSourceProvider) {
+ this.pageSource =
+ pageSourceProvider.createPageSource(
+ null,
+ session,
+ split,
+ (ConnectorTableHandle) handle,
+ handle.getProjectedColumns().get(),
+ DynamicFilter.EMPTY);
+ }
+
+ @Override
+ public TableFunctionProcessorState process() {
+ if (pageSource.isFinished()) {
+ return FINISHED;
+ }
+ Page dataPage = pageSource.getNextPage();
+ if (dataPage == null) {
+ return TableFunctionProcessorState.Processed.produced(EMPTY_PAGE);
+ } else {
+ return TableFunctionProcessorState.Processed.produced(dataPage);
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProcessorProvider.java
b/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProcessorProvider.java
new file mode 100644
index 0000000..be807d0
--- /dev/null
+++
b/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProcessorProvider.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino.functions.tablechanges;
+
+import org.apache.paimon.trino.TrinoPageSourceProvider;
+import org.apache.paimon.trino.TrinoSplit;
+import org.apache.paimon.trino.TrinoTableHandle;
+
+import com.google.inject.Inject;
+import
io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionSplitProcessor;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplit;
+import io.trino.spi.function.table.ConnectorTableFunctionHandle;
+import io.trino.spi.function.table.TableFunctionProcessorProvider;
+import io.trino.spi.function.table.TableFunctionSplitProcessor;
+
+/** TableChangesFunctionProcessorProvider. */
+public class TableChangesFunctionProcessorProvider implements
TableFunctionProcessorProvider {
+
+ private final TrinoPageSourceProvider icebergPageSourceProvider;
+
+ @Inject
+ public TableChangesFunctionProcessorProvider(
+ TrinoPageSourceProvider icebergPageSourceProvider) {
+ this.icebergPageSourceProvider = icebergPageSourceProvider;
+ }
+
+ @Override
+ public TableFunctionSplitProcessor getSplitProcessor(
+ ConnectorSession session, ConnectorTableFunctionHandle handle,
ConnectorSplit split) {
+ return new ClassLoaderSafeTableFunctionSplitProcessor(
+ new TableChangesFunctionProcessor(
+ session,
+ (TrinoTableHandle) handle,
+ (TrinoSplit) split,
+ icebergPageSourceProvider),
+ getClass().getClassLoader());
+ }
+}
diff --git
a/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProvider.java
b/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProvider.java
new file mode 100644
index 0000000..95736d7
--- /dev/null
+++
b/src/main/java/org/apache/paimon/trino/functions/tablechanges/TableChangesFunctionProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino.functions.tablechanges;
+
+import org.apache.paimon.trino.TrinoMetadataFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorTableFunction;
+import io.trino.spi.function.table.ConnectorTableFunction;
+
+import static java.util.Objects.requireNonNull;
+
+/** TableChangesFunctionProvider. */
+public class TableChangesFunctionProvider implements
Provider<ConnectorTableFunction> {
+ private final TrinoMetadataFactory trinoMetadataFactory;
+
+ @Inject
+ public TableChangesFunctionProvider(TrinoMetadataFactory
trinoMetadataFactory) {
+ this.trinoMetadataFactory =
+ requireNonNull(trinoMetadataFactory, "trinoMetadataFactory is
null");
+ }
+
+ @Override
+ public ConnectorTableFunction get() {
+ return new ClassLoaderSafeConnectorTableFunction(
+ new TableChangesFunction(trinoMetadataFactory),
getClass().getClassLoader());
+ }
+}
diff --git a/src/test/java/org/apache/paimon/trino/TrinoITCase.java
b/src/test/java/org/apache/paimon/trino/TrinoITCase.java
index 1d460c4..c6f17d2 100644
--- a/src/test/java/org/apache/paimon/trino/TrinoITCase.java
+++ b/src/test/java/org/apache/paimon/trino/TrinoITCase.java
@@ -49,6 +49,7 @@ import org.apache.paimon.types.VarCharType;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResult;
+import io.trino.testing.QueryFailedException;
import io.trino.testing.QueryRunner;
import org.junit.jupiter.api.Test;
@@ -68,6 +69,7 @@ import static
io.trino.testing.TestingSession.testSessionBuilder;
import static java.time.ZoneOffset.UTC;
import static org.apache.paimon.data.BinaryString.fromString;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
/** ITCase for trino connector. */
public class TrinoITCase extends AbstractTestQueryFramework {
@@ -854,6 +856,32 @@ public class TrinoITCase extends
AbstractTestQueryFramework {
.isEqualTo("[[1, 2, 1, 1], [3, 4, 2, 2], [5, 6, 3, 3], [7, 8,
4, 4]]");
}
+ @Test
+ public void testIncrementalRead() {
+ assertThatExceptionOfType(QueryFailedException.class)
+ .isThrownBy(
+ () ->
+ sql(
+ "SELECT * FROM
TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2'))"))
+ .withMessage(
+ "Either INCREMENTAL_BETWEEN or
INCREMENTAL_BETWEEN_TIMESTAMP must be provided");
+ assertThat(
+ sql(
+ "SELECT * FROM
TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2',incremental_between=>'1,2'))"))
+ .isEqualTo("[[5, 6, 3, 3], [7, 8, 4, 4]]");
+ assertThat(
+ sql(
+ "SELECT * FROM
TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2',incremental_between=>'1,tag-2'))"))
+ .isEqualTo("[[5, 6, 3, 3], [7, 8, 4, 4]]");
+ assertThat(
+ sql(
+ "SELECT * FROM
TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2',incremental_between_timestamp=>'%s,%s'))"
+ .formatted(
+ t2FirstCommitTimestamp,
+ System.currentTimeMillis())))
+ .isEqualTo("[[5, 6, 3, 3], [7, 8, 4, 4]]");
+ }
+
@Test
public void testTimeTravelWithTag() {
// tag or snapshotId is string