This is an automated email from the ASF dual-hosted git repository. junhao pushed a commit to branch old_version_master in repository https://gitbox.apache.org/repos/asf/paimon-trino.git
The following commit(s) were added to refs/heads/old_version_master by this push: new f67712a [388] Cherry-pick incremental read from https://github.com/apache/paimon-trino/commit/5697da8de19a60e52842fd86d612c99e35aa0877\#diff-e62f488a14d012b280ffd60b07e6a3915757e393f68103b42fd3262adc0b9d61, make it suitable for trino-388 f67712a is described below commit f67712ac0c5651e11952d51fea5d28f3229f419d Author: 仟弋 <yejunhao....@alibaba-inc.com> AuthorDate: Tue Jul 22 11:25:31 2025 +0800 [388] Cherry-pick incremental read from https://github.com/apache/paimon-trino/commit/5697da8de19a60e52842fd86d612c99e35aa0877\#diff-e62f488a14d012b280ffd60b07e6a3915757e393f68103b42fd3262adc0b9d61, make it suitable for trino-388 --- .../apache/paimon/trino/TableChangesFunction.java | 223 ++++++++++ .../paimon/trino/TableChangesFunctionProvider.java | 43 ++ .../org/apache/paimon/trino/TrinoConnector.java | 57 +++ .../apache/paimon/trino/TrinoConnectorBase.java | 96 +++++ .../paimon/trino/TrinoConnectorFactoryBase.java | 97 +++++ .../org/apache/paimon/trino/TrinoMetadataBase.java | 447 +++++++++++++++++++++ .../java/org/apache/paimon/trino/TrinoModule.java | 54 +++ .../apache/paimon/trino/TrinoSplitManagerBase.java | 66 +++ .../org/apache/paimon/trino/TrinoTableHandle.java | 210 ++++++++++ .../apache/paimon/trino/TestTrino388ITCase.java | 26 ++ .../apache/paimon/trino/TrinoTableOptionUtils.java | 2 +- 11 files changed, 1320 insertions(+), 1 deletion(-) diff --git a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunction.java b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunction.java new file mode 100644 index 0000000..94c8da9 --- /dev/null +++ b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunction.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.trino; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.InstantiationUtil; + +import com.google.inject.Inject; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.ptf.AbstractConnectorTableFunction; +import io.trino.spi.ptf.Argument; +import io.trino.spi.ptf.Descriptor; +import io.trino.spi.ptf.ScalarArgument; +import io.trino.spi.ptf.ScalarArgumentSpecification; +import io.trino.spi.ptf.TableFunctionAnalysis; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; + +import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT; +import static io.trino.spi.ptf.ReturnTypeSpecification.GenericTable.GENERIC_TABLE; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Locale.ENGLISH; +import static java.util.Objects.requireNonNull; + +/** TableChangesFunction. */ +public class TableChangesFunction extends AbstractConnectorTableFunction { + + private static final Slice INVALID_VALUE = Slices.utf8Slice("invalid"); + private static final String FUNCTION_NAME = "table_changes"; + private static final String SCHEMA_NAME_VAR_NAME = "SCHEMA_NAME"; + private static final String TABLE_NAME_VAR_NAME = "TABLE_NAME"; + private static final String INCREMENTAL_BETWEEN_SCAN_MODE = + TrinoTableOptionUtils.convertOptionKey(CoreOptions.INCREMENTAL_BETWEEN_SCAN_MODE.key()) + .toUpperCase(ENGLISH); + private static final String INCREMENTAL_BETWEEN_TIMESTAMP = + TrinoTableOptionUtils.convertOptionKey(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key()) + .toUpperCase(ENGLISH); + private static final String INCREMENTAL_BETWEEN = + TrinoTableOptionUtils.convertOptionKey(CoreOptions.INCREMENTAL_BETWEEN.key()) + .toUpperCase(ENGLISH); + private final TrinoMetadata trinoMetadata; + + @Inject + public TableChangesFunction(TrinoMetadataFactory trinoMetadataFactory) { + super( + "system", + FUNCTION_NAME, + ImmutableList.of( + ScalarArgumentSpecification.builder() + .name(SCHEMA_NAME_VAR_NAME) + .type(VARCHAR) + .build(), + ScalarArgumentSpecification.builder() + .name(TABLE_NAME_VAR_NAME) + .type(VARCHAR) + .build(), + ScalarArgumentSpecification.builder() + .name(INCREMENTAL_BETWEEN_SCAN_MODE) + .defaultValue( + Slices.utf8Slice( + CoreOptions.INCREMENTAL_BETWEEN_SCAN_MODE + .defaultValue() + .toString())) + .type(VARCHAR) + .build(), + ScalarArgumentSpecification.builder() + .name(INCREMENTAL_BETWEEN) + .defaultValue(INVALID_VALUE) + .type(VARCHAR) + .build(), + ScalarArgumentSpecification.builder() + .name(INCREMENTAL_BETWEEN_TIMESTAMP) + .defaultValue(INVALID_VALUE) + .type(VARCHAR) + .build()), + GENERIC_TABLE); + this.trinoMetadata = + requireNonNull(trinoMetadataFactory, "trinoMetadataFactory is null").create(); + } + + @Override + public TableFunctionAnalysis analyze( + ConnectorSession session, + ConnectorTransactionHandle transaction, + Map<String, Argument> arguments) { + String schema = getSchemaName(arguments); + String table = getTableName(arguments); + + Slice incrementalBetweenValue = + (Slice) ((ScalarArgument) arguments.get(INCREMENTAL_BETWEEN)).getValue(); + Slice incrementalBetweenTimestamp = + (Slice) ((ScalarArgument) arguments.get(INCREMENTAL_BETWEEN_TIMESTAMP)).getValue(); + if (incrementalBetweenValue.equals(INVALID_VALUE) + && incrementalBetweenTimestamp.equals(INVALID_VALUE)) { + throw new TrinoException( + INVALID_FUNCTION_ARGUMENT, + "Either " + + INCREMENTAL_BETWEEN + + " or " + + INCREMENTAL_BETWEEN_TIMESTAMP + + " must be provided"); + } + + SchemaTableName schemaTableName = new SchemaTableName(schema, table); + try { + TrinoTableHandle trinoTableHandle = + trinoMetadata.getTableHandle(session, schemaTableName); + Table paimonTable = trinoTableHandle.table(); + Map<String, String> options = new HashMap<>(paimonTable.options()); + if (!incrementalBetweenValue.equals(INVALID_VALUE)) { + options.put( + CoreOptions.INCREMENTAL_BETWEEN.key(), + incrementalBetweenValue.toStringUtf8()); + } + if (!incrementalBetweenTimestamp.equals(INVALID_VALUE)) { + options.put( + CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key(), + incrementalBetweenTimestamp.toStringUtf8()); + } + paimonTable = paimonTable.copy(options); + + ImmutableList.Builder<Descriptor.Field> columns = ImmutableList.builder(); + List<ColumnHandle> projectedColumns = new ArrayList<>(); + paimonTable.rowType().getFields().stream() + .forEach( + column -> { + columns.add( + new Descriptor.Field( + column.name(), + Optional.of( + TrinoTypeUtils.fromPaimonType( + column.type())))); + projectedColumns.add( + TrinoColumnHandle.of( + column.name().toLowerCase(), column.type())); + }); + return TableFunctionAnalysis.builder() + .returnedType(new Descriptor(columns.build())) + .handle( + new TrinoTableHandle( + schema, + table, + InstantiationUtil.serializeObject(paimonTable), + TupleDomain.all(), + Optional.of(projectedColumns), + OptionalLong.empty())) + .build(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize table", e); + } + } + + private static String getSchemaName(Map<String, Argument> arguments) { + if (argumentExists(arguments, SCHEMA_NAME_VAR_NAME)) { + return ((Slice) + checkNonNull( + ((ScalarArgument) arguments.get(SCHEMA_NAME_VAR_NAME)) + .getValue())) + .toStringUtf8(); + } + throw new TrinoException( + INVALID_FUNCTION_ARGUMENT, SCHEMA_NAME_VAR_NAME + " argument not found"); + } + + private static String getTableName(Map<String, Argument> arguments) { + if (argumentExists(arguments, TABLE_NAME_VAR_NAME)) { + return ((Slice) + checkNonNull( + ((ScalarArgument) arguments.get(TABLE_NAME_VAR_NAME)) + .getValue())) + .toStringUtf8(); + } + throw new TrinoException( + INVALID_FUNCTION_ARGUMENT, TABLE_NAME_VAR_NAME + " argument not found"); + } + + private static boolean argumentExists(Map<String, Argument> arguments, String key) { + Argument argument = arguments.get(key); + if (argument instanceof ScalarArgument) { + return !(((ScalarArgument) argument).getValue() == null); + } + throw new IllegalArgumentException("Unsupported argument type: " + argument); + } + + private static Object checkNonNull(Object argumentValue) { + if (argumentValue == null) { + throw new TrinoException( + INVALID_FUNCTION_ARGUMENT, FUNCTION_NAME + " arguments may not be null"); + } + return argumentValue; + } +} diff --git a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunctionProvider.java b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunctionProvider.java new file mode 100644 index 0000000..0f9f8a9 --- /dev/null +++ b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunctionProvider.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.trino; + +import com.google.inject.Inject; +import com.google.inject.Provider; +import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorTableFunction; +import io.trino.spi.ptf.ConnectorTableFunction; + +import static java.util.Objects.requireNonNull; + +/** TableChangesFunctionProvider. */ +public class TableChangesFunctionProvider implements Provider<ConnectorTableFunction> { + private final TrinoMetadataFactory trinoMetadataFactory; + + @Inject + public TableChangesFunctionProvider(TrinoMetadataFactory trinoMetadataFactory) { + this.trinoMetadataFactory = + requireNonNull(trinoMetadataFactory, "trinoMetadataFactory is null"); + } + + @Override + public ConnectorTableFunction get() { + return new ClassLoaderSafeConnectorTableFunction( + new TableChangesFunction(trinoMetadataFactory), getClass().getClassLoader()); + } +} diff --git a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnector.java b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnector.java new file mode 100644 index 0000000..043616f --- /dev/null +++ b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnector.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.trino; + +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.ptf.ConnectorTableFunction; +import io.trino.spi.transaction.IsolationLevel; + +import java.util.Set; + +/** Trino {@link Connector}. */ +public class TrinoConnector extends TrinoConnectorBase { + + public TrinoConnector( + TrinoMetadataBase trinoMetadata, + TrinoSplitManagerBase trinoSplitManager, + TrinoPageSourceProvider trinoPageSourceProvider, + TrinoTableOptions trinoTableOptions, + TrinoSessionProperties trinoSessionProperties, + Set<ConnectorTableFunction> connectorTableFunctions) { + super( + trinoMetadata, + trinoSplitManager, + trinoPageSourceProvider, + trinoTableOptions, + trinoSessionProperties, + connectorTableFunctions); + } + + @Override + public ConnectorTransactionHandle beginTransaction( + IsolationLevel isolationLevel, boolean readOnly) { + return beginTransactionBase(isolationLevel, readOnly); + } + + @Override + public TrinoMetadataBase getMetadata(ConnectorTransactionHandle transactionHandle) { + return getMetadataBase(transactionHandle); + } +} diff --git a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorBase.java b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorBase.java new file mode 100644 index 0000000..24d6aeb --- /dev/null +++ b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorBase.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.trino; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet; + +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.ptf.ConnectorTableFunction; +import io.trino.spi.session.PropertyMetadata; +import io.trino.spi.transaction.IsolationLevel; + +import java.util.List; +import java.util.Set; + +import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED; +import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports; +import static java.util.Objects.requireNonNull; + +/** Trino {@link Connector}. */ +public abstract class TrinoConnectorBase implements Connector { + private final TrinoMetadataBase trinoMetadata; + private final TrinoSplitManagerBase trinoSplitManager; + private final TrinoPageSourceProvider trinoPageSourceProvider; + private final List<PropertyMetadata<?>> tableProperties; + private final List<PropertyMetadata<?>> sessionProperties; + private final Set<ConnectorTableFunction> tableFunctions; + + public TrinoConnectorBase( + TrinoMetadataBase trinoMetadata, + TrinoSplitManagerBase trinoSplitManager, + TrinoPageSourceProvider trinoPageSourceProvider, + TrinoTableOptions trinoTableOptions, + TrinoSessionProperties trinoSessionProperties, + Set<ConnectorTableFunction> tableFunctions) { + this.trinoMetadata = requireNonNull(trinoMetadata, "jmxMetadata is null"); + this.trinoSplitManager = requireNonNull(trinoSplitManager, "jmxSplitManager is null"); + this.trinoPageSourceProvider = + requireNonNull(trinoPageSourceProvider, "jmxRecordSetProvider is null"); + this.tableProperties = trinoTableOptions.getTableProperties(); + sessionProperties = trinoSessionProperties.getSessionProperties(); + this.tableFunctions = + ImmutableSet.copyOf(requireNonNull(tableFunctions, "tableFunctions is null")); + } + + protected ConnectorTransactionHandle beginTransactionBase( + IsolationLevel isolationLevel, boolean readOnly) { + checkConnectorSupports(READ_COMMITTED, isolationLevel); + return TrinoTransactionHandle.INSTANCE; + } + + protected TrinoMetadataBase getMetadataBase(ConnectorTransactionHandle transactionHandle) { + return trinoMetadata; + } + + @Override + public TrinoSplitManagerBase getSplitManager() { + return trinoSplitManager; + } + + @Override + public TrinoPageSourceProvider getPageSourceProvider() { + return trinoPageSourceProvider; + } + + @Override + public List<PropertyMetadata<?>> getSessionProperties() { + return sessionProperties; + } + + @Override + public List<PropertyMetadata<?>> getTableProperties() { + return tableProperties; + } + + @Override + public Set<ConnectorTableFunction> getTableFunctions() { + return tableFunctions; + } +} diff --git a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorFactoryBase.java b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorFactoryBase.java new file mode 100644 index 0000000..8a54f1d --- /dev/null +++ b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorFactoryBase.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.trino; + +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import io.airlift.bootstrap.Bootstrap; +import io.airlift.json.JsonModule; +import io.trino.plugin.hive.HiveHdfsModule; +import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.hive.authentication.HdfsAuthenticationModule; +import io.trino.spi.classloader.ThreadContextClassLoader; +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorContext; +import io.trino.spi.connector.ConnectorFactory; +import io.trino.spi.ptf.ConnectorTableFunction; +import io.trino.spi.type.TypeManager; + +import java.util.Map; +import java.util.Set; + +/** Trino {@link ConnectorFactory}. */ +public abstract class TrinoConnectorFactoryBase implements ConnectorFactory { + + @Override + public String getName() { + return "paimon"; + } + + @Override + public Connector create( + String catalogName, Map<String, String> config, ConnectorContext context) { + + try (ThreadContextClassLoader ignored = + new ThreadContextClassLoader(TrinoConnectorFactoryBase.class.getClassLoader())) { + Bootstrap app = new Bootstrap(modules(catalogName, config, context)); + + Injector injector = + app.doNotInitializeLogging() + .setRequiredConfigurationProperties(Map.of()) + .setOptionalConfigurationProperties(config) + .initialize(); + + TrinoMetadata trinoMetadata = injector.getInstance(TrinoMetadataFactory.class).create(); + TrinoSplitManager trinoSplitManager = injector.getInstance(TrinoSplitManager.class); + TrinoPageSourceProvider trinoPageSourceProvider = + injector.getInstance(TrinoPageSourceProvider.class); + TrinoSessionProperties trinoSessionProperties = + injector.getInstance(TrinoSessionProperties.class); + TrinoTableOptions trinoTableOptions = injector.getInstance(TrinoTableOptions.class); + Set<ConnectorTableFunction> connectorTableFunctions = + injector.getInstance(new Key<>() {}); + + return new TrinoConnector( + trinoMetadata, + trinoSplitManager, + trinoPageSourceProvider, + trinoTableOptions, + trinoSessionProperties, + connectorTableFunctions); + } + } + + protected Module[] modules( + String catalogName, Map<String, String> config, ConnectorContext context) { + return new Module[] { + new JsonModule(), + new TrinoModule(config), + new HiveHdfsModule(), + new HdfsAuthenticationModule(), + binder -> { + binder.bind(NodeVersion.class) + .toInstance( + new NodeVersion( + context.getNodeManager().getCurrentNode().getVersion())); + binder.bind(TypeManager.class).toInstance(context.getTypeManager()); + } + }; + } +} diff --git a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java new file mode 100644 index 0000000..dfcc435 --- /dev/null +++ b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.trino; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.InstantiationUtil; +import org.apache.paimon.utils.StringUtils; + +import io.trino.spi.connector.Assignment; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.ConnectorTableProperties; +import io.trino.spi.connector.Constraint; +import io.trino.spi.connector.ConstraintApplicationResult; +import io.trino.spi.connector.LimitApplicationResult; +import io.trino.spi.connector.ProjectionApplicationResult; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.SchemaTablePrefix; +import io.trino.spi.connector.TableFunctionApplicationResult; +import io.trino.spi.expression.ConnectorExpression; +import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.ptf.ConnectorTableFunctionHandle; +import io.trino.spi.security.TrinoPrincipal; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Trino {@link ConnectorMetadata}. */ +public abstract class TrinoMetadataBase implements ConnectorMetadata { + + protected final Catalog catalog; + + public TrinoMetadataBase(Catalog catalog) { + this.catalog = catalog; + } + + @Override + public List<String> listSchemaNames(ConnectorSession session) { + return listSchemaNames(); + } + + private List<String> listSchemaNames() { + return catalog.listDatabases(); + } + + @Override + public void createSchema( + ConnectorSession session, + String schemaName, + Map<String, Object> properties, + TrinoPrincipal owner) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(schemaName), + "schemaName cannot be null or empty"); + + try { + catalog.createDatabase(schemaName, true); + } catch (Catalog.DatabaseAlreadyExistException e) { + throw new RuntimeException(format("database already existed: '%s'", schemaName)); + } + } + + @Override + public void dropSchema(ConnectorSession session, String schemaName) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(schemaName), + "schemaName cannot be null or empty"); + try { + catalog.dropDatabase(schemaName, false, true); + } catch (Catalog.DatabaseNotEmptyException e) { + throw new RuntimeException(format("database is not empty: '%s'", schemaName)); + } catch (Catalog.DatabaseNotExistException e) { + throw new RuntimeException(format("database not exists: '%s'", schemaName)); + } + } + + @Override + public TrinoTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { + return getTableHandle(tableName, null); + } + + @Override + public ConnectorTableProperties getTableProperties( + ConnectorSession session, ConnectorTableHandle table) { + return new ConnectorTableProperties(); + } + + public TrinoTableHandle getTableHandle( + SchemaTableName tableName, Map<String, String> dynamicOptions) { + Identifier tablePath = new Identifier(tableName.getSchemaName(), tableName.getTableName()); + byte[] serializedTable; + try { + Table table = catalog.getTable(tablePath); + if (dynamicOptions != null && !dynamicOptions.isEmpty()) { + table = table.copy(dynamicOptions); + } + serializedTable = InstantiationUtil.serializeObject(table); + } catch (Catalog.TableNotExistException e) { + return null; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return new TrinoTableHandle( + tableName.getSchemaName(), tableName.getTableName(), serializedTable); + } + + @Override + public ConnectorTableMetadata getTableMetadata( + ConnectorSession session, ConnectorTableHandle tableHandle) { + return ((TrinoTableHandle) tableHandle).tableMetadata(); + } + + @Override + public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName) { + List<SchemaTableName> tables = new ArrayList<>(); + schemaName + .map(Collections::singletonList) + .orElseGet(catalog::listDatabases) + .forEach(schema -> tables.addAll(listTables(schema))); + return tables; + } + + private List<SchemaTableName> listTables(String schema) { + try { + return catalog.listTables(schema).stream() + .map(table -> new SchemaTableName(schema, table)) + .collect(toList()); + } catch (Catalog.DatabaseNotExistException e) { + throw new RuntimeException(e); + } + } + + @Override + public void createTable( + ConnectorSession session, + ConnectorTableMetadata tableMetadata, + boolean ignoreExisting) { + SchemaTableName table = tableMetadata.getTable(); + Identifier identifier = Identifier.create(table.getSchemaName(), table.getTableName()); + + try { + catalog.createTable(identifier, prepareSchema(tableMetadata), false); + } catch (Catalog.DatabaseNotExistException e) { + throw new RuntimeException(format("database not exists: '%s'", table.getSchemaName())); + } catch (Catalog.TableAlreadyExistException e) { + throw new RuntimeException(format("table already existed: '%s'", table.getTableName())); + } + } + + private Schema prepareSchema(ConnectorTableMetadata tableMetadata) { + Map<String, Object> properties = new HashMap<>(tableMetadata.getProperties()); + Schema.Builder builder = + Schema.newBuilder() + .primaryKey(TrinoTableOptions.getPrimaryKeys(properties)) + .partitionKeys(TrinoTableOptions.getPartitionedKeys(properties)); + + for (ColumnMetadata column : tableMetadata.getColumns()) { + builder.column( + column.getName(), + TrinoTypeUtils.toPaimonType(column.getType()), + column.getComment()); + } + + TrinoTableOptionUtils.buildOptions(builder, properties); + + return builder.build(); + } + + @Override + public void renameTable( + ConnectorSession session, + ConnectorTableHandle tableHandle, + SchemaTableName newTableName) { + TrinoTableHandle oldTableHandle = (TrinoTableHandle) tableHandle; + try { + catalog.renameTable( + new Identifier(oldTableHandle.getSchemaName(), oldTableHandle.getTableName()), + new Identifier(newTableName.getSchemaName(), newTableName.getTableName()), + false); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException( + format("table not exists: '%s'", oldTableHandle.getTableName())); + } catch (Catalog.TableAlreadyExistException e) { + throw new RuntimeException( + format("table already existed: '%s'", newTableName.getTableName())); + } + } + + @Override + public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) { + TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle; + try { + catalog.dropTable( + new Identifier( + trinoTableHandle.getSchemaName(), trinoTableHandle.getTableName()), + false); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException( + format("table not exists: '%s'", trinoTableHandle.getTableName())); + } + } + + @Override + public Map<String, ColumnHandle> getColumnHandles( + ConnectorSession session, ConnectorTableHandle tableHandle) { + TrinoTableHandle table = (TrinoTableHandle) tableHandle; + Map<String, ColumnHandle> handleMap = new HashMap<>(); + for (ColumnMetadata column : table.columnMetadatas()) { + handleMap.put(column.getName(), table.columnHandle(column.getName())); + } + return handleMap; + } + + @Override + public ColumnMetadata getColumnMetadata( + ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) { + return ((TrinoColumnHandle) columnHandle).getColumnMetadata(); + } + + @Override + public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns( + ConnectorSession session, SchemaTablePrefix prefix) { + requireNonNull(prefix, "prefix is null"); + List<SchemaTableName> tableNames; + if (prefix.getTable().isPresent()) { + tableNames = Collections.singletonList(prefix.toSchemaTableName()); + } else { + tableNames = listTables(session, prefix.getSchema()); + } + + return tableNames.stream() + .collect( + toMap( + Function.identity(), + table -> getTableHandle(session, table).columnMetadatas())); + } + + @Override + public void addColumn( + ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column) { + TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle; + Identifier identifier = + new Identifier(trinoTableHandle.getSchemaName(), trinoTableHandle.getTableName()); + List<SchemaChange> changes = new ArrayList<>(); + changes.add( + SchemaChange.addColumn( + column.getName(), TrinoTypeUtils.toPaimonType(column.getType()))); + try { + catalog.alterTable(identifier, changes, false); + } catch (Exception e) { + throw new RuntimeException( + format("failed to alter table: '%s'", trinoTableHandle.getTableName()), e); + } + } + + @Override + public void renameColumn( + ConnectorSession session, + ConnectorTableHandle tableHandle, + ColumnHandle source, + String target) { + TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle; + Identifier identifier = + new Identifier(trinoTableHandle.getSchemaName(), trinoTableHandle.getTableName()); + TrinoColumnHandle trinoColumnHandle = (TrinoColumnHandle) source; + List<SchemaChange> changes = new ArrayList<>(); + changes.add(SchemaChange.renameColumn(trinoColumnHandle.getColumnName(), target)); + try { + catalog.alterTable(identifier, changes, false); + } catch (Exception e) { + throw new RuntimeException( + format("failed to alter table: '%s'", trinoTableHandle.getTableName()), e); + } + } + + @Override + public void dropColumn( + ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column) { + TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle; + Identifier identifier = + new Identifier(trinoTableHandle.getSchemaName(), trinoTableHandle.getTableName()); + TrinoColumnHandle trinoColumnHandle = (TrinoColumnHandle) column; + List<SchemaChange> changes = new ArrayList<>(); + changes.add(SchemaChange.dropColumn(trinoColumnHandle.getColumnName())); + try { + catalog.alterTable(identifier, changes, false); + } catch (Exception e) { + throw new RuntimeException( + format("failed to alter table: '%s'", trinoTableHandle.getTableName()), e); + } + } + + @Override + public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter( + ConnectorSession session, ConnectorTableHandle handle, Constraint constraint) { + TrinoTableHandle trinoTableHandle = (TrinoTableHandle) handle; + TupleDomain<TrinoColumnHandle> oldFilter = trinoTableHandle.getFilter(); + TupleDomain<TrinoColumnHandle> newFilter = + constraint + .getSummary() + .transformKeys(TrinoColumnHandle.class::cast) + .intersect(oldFilter); + if (oldFilter.equals(newFilter)) { + return Optional.empty(); + } + + LinkedHashMap<TrinoColumnHandle, Domain> acceptedDomains = new LinkedHashMap<>(); + LinkedHashMap<TrinoColumnHandle, Domain> unsupportedDomains = new LinkedHashMap<>(); + new TrinoFilterConverter(trinoTableHandle.table().rowType()) + .convert(newFilter, acceptedDomains, unsupportedDomains); + + List<String> partitionKeys = trinoTableHandle.table().partitionKeys(); + LinkedHashMap<TrinoColumnHandle, Domain> unenforcedDomains = new LinkedHashMap<>(); + acceptedDomains.forEach( + (columnHandle, domain) -> { + if (!partitionKeys.contains(columnHandle.getColumnName())) { + unenforcedDomains.put(columnHandle, domain); + } + }); + + @SuppressWarnings({"unchecked", "rawtypes"}) + TupleDomain<ColumnHandle> remain = + (TupleDomain) + TupleDomain.withColumnDomains(unsupportedDomains) + .intersect(TupleDomain.withColumnDomains(unenforcedDomains)); + + return Optional.of( + new ConstraintApplicationResult<>(trinoTableHandle.copy(newFilter), remain, false)); + } + + @Override + public Optional<ProjectionApplicationResult<ConnectorTableHandle>> applyProjection( + ConnectorSession session, + ConnectorTableHandle handle, + List<ConnectorExpression> projections, + Map<String, ColumnHandle> assignments) { + TrinoTableHandle trinoTableHandle = (TrinoTableHandle) handle; + List<ColumnHandle> newColumns = new ArrayList<>(assignments.values()); + + if (trinoTableHandle.getProjectedColumns().isPresent() + && containSameElements(newColumns, trinoTableHandle.getProjectedColumns().get())) { + return Optional.empty(); + } + + List<Assignment> assignmentList = new ArrayList<>(); + assignments.forEach( + (name, column) -> + assignmentList.add( + new Assignment( + name, + column, + ((TrinoColumnHandle) column).getTrinoType()))); + + return Optional.of( + new ProjectionApplicationResult<>( + trinoTableHandle.copy(Optional.of(newColumns)), + projections, + assignmentList, + false)); + } + + private static boolean containSameElements( + List<? extends ColumnHandle> first, List<? extends ColumnHandle> second) { + return new HashSet<>(first).equals(new HashSet<>(second)); + } + + @Override + public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit( + ConnectorSession session, ConnectorTableHandle handle, long limit) { + TrinoTableHandle table = (TrinoTableHandle) handle; + + if (table.getLimit().isPresent() && table.getLimit().getAsLong() <= limit) { + return Optional.empty(); + } + + if (!table.getFilter().isAll()) { + LinkedHashMap<TrinoColumnHandle, Domain> acceptedDomains = new LinkedHashMap<>(); + LinkedHashMap<TrinoColumnHandle, Domain> unsupportedDomains = new LinkedHashMap<>(); + new TrinoFilterConverter(table.table().rowType()) + .convert(table.getFilter(), acceptedDomains, unsupportedDomains); + Set<String> acceptedFields = + acceptedDomains.keySet().stream() + .map(TrinoColumnHandle::getColumnName) + .collect(Collectors.toSet()); + if (unsupportedDomains.size() > 0 + || !table.table().partitionKeys().containsAll(acceptedFields)) { + return Optional.empty(); + } + } + + table = table.copy(OptionalLong.of(limit)); + + return Optional.of(new LimitApplicationResult<>(table, false, false)); + } + + @Override + public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> applyTableFunction( + ConnectorSession session, ConnectorTableFunctionHandle handle) { + return Optional.of( + new TableFunctionApplicationResult( + handle, ((TrinoTableHandle) handle).getProjectedColumns().get())); + } +} diff --git a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoModule.java b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoModule.java new file mode 100644 index 0000000..b356b71 --- /dev/null +++ b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoModule.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.trino; + +import org.apache.paimon.options.Options; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Scopes; +import io.trino.spi.ptf.ConnectorTableFunction; + +import java.util.Map; + +import static com.google.inject.Scopes.SINGLETON; +import static com.google.inject.multibindings.Multibinder.newSetBinder; + +/** Module for binding instance. */ +public class TrinoModule implements Module { + private Map<String, String> config; + + public TrinoModule(Map<String, String> config) { + this.config = config; + } + + @Override + public void configure(Binder binder) { + binder.bind(Options.class).toInstance(new Options(config)); + binder.bind(TrinoMetadataFactory.class).in(SINGLETON); + binder.bind(TrinoSplitManager.class).in(SINGLETON); + binder.bind(TrinoPageSourceProvider.class).in(SINGLETON); + binder.bind(TrinoSessionProperties.class).in(SINGLETON); + binder.bind(TrinoTableOptions.class).in(SINGLETON); + newSetBinder(binder, ConnectorTableFunction.class) + .addBinding() + .toProvider(TableChangesFunctionProvider.class) + .in(Scopes.SINGLETON); + } +} diff --git a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoSplitManagerBase.java b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoSplitManagerBase.java new file mode 100644 index 0000000..61a26d2 --- /dev/null +++ b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoSplitManagerBase.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.trino; + +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; + +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplitManager; +import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.connector.ConnectorTableHandle; + +import java.util.List; +import java.util.stream.Collectors; + +/** Trino {@link ConnectorSplitManager}. */ +public abstract class TrinoSplitManagerBase implements ConnectorSplitManager { + + protected ConnectorSplitSource getSplits( + ConnectorTableHandle connectorTableHandle, ConnectorSession session) { + // TODO dynamicFilter? + // TODO what is constraint? + + TrinoTableHandle tableHandle = (TrinoTableHandle) connectorTableHandle; + Table table = tableHandle.tableWithDynamicOptions(session); + ReadBuilder readBuilder = table.newReadBuilder(); + new TrinoFilterConverter(table.rowType()) + .convert(tableHandle.getFilter()) + .ifPresent(readBuilder::withFilter); + tableHandle.getLimit().ifPresent(limit -> readBuilder.withLimit((int) limit)); + List<Split> splits = readBuilder.newScan().plan().splits(); + + long maxRowCount = splits.stream().mapToLong(Split::rowCount).max().orElse(0L); + double minimumSplitWeight = TrinoSessionProperties.getMinimumSplitWeight(session); + return new TrinoSplitSource( + splits.stream() + .map( + split -> + TrinoSplit.fromSplit( + split, + Math.min( + Math.max( + (double) split.rowCount() + / maxRowCount, + minimumSplitWeight), + 1.0))) + .collect(Collectors.toList())); + } +} diff --git a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java new file mode 100644 index 0000000..a9d0a6d --- /dev/null +++ b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.trino; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.InstantiationUtil; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.ptf.ConnectorTableFunctionHandle; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.stream.Collectors; + +/** Trino {@link ConnectorTableHandle}. */ +public final class TrinoTableHandle implements ConnectorTableHandle, ConnectorTableFunctionHandle { + + private final String schemaName; + private final String tableName; + private final byte[] serializedTable; + private final TupleDomain<TrinoColumnHandle> filter; + private final Optional<List<ColumnHandle>> projectedColumns; + private final OptionalLong limit; + + private Table lazyTable; + + public TrinoTableHandle(String schemaName, String tableName, byte[] serializedTable) { + this( + schemaName, + tableName, + serializedTable, + TupleDomain.all(), + Optional.empty(), + OptionalLong.empty()); + } + + @JsonCreator + public TrinoTableHandle( + @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") String tableName, + @JsonProperty("serializedTable") byte[] serializedTable, + @JsonProperty("filter") TupleDomain<TrinoColumnHandle> filter, + @JsonProperty("projection") Optional<List<ColumnHandle>> projectedColumns, + @JsonProperty("limit") OptionalLong limit) { + this.schemaName = schemaName; + this.tableName = tableName; + this.serializedTable = serializedTable; + this.filter = filter; + this.projectedColumns = projectedColumns; + this.limit = limit; + } + + @JsonProperty + public String getSchemaName() { + return schemaName; + } + + @JsonProperty + public String getTableName() { + return tableName; + } + + @JsonProperty + public byte[] getSerializedTable() { + return serializedTable; + } + + @JsonProperty + public TupleDomain<TrinoColumnHandle> getFilter() { + return filter; + } + + @JsonProperty + public Optional<List<ColumnHandle>> getProjectedColumns() { + return projectedColumns; + } + + public OptionalLong getLimit() { + return limit; + } + + public TrinoTableHandle copy(TupleDomain<TrinoColumnHandle> filter) { + return new TrinoTableHandle( + schemaName, tableName, serializedTable, filter, projectedColumns, limit); + } + + public TrinoTableHandle copy(Optional<List<ColumnHandle>> projectedColumns) { + return new TrinoTableHandle( + schemaName, tableName, serializedTable, filter, projectedColumns, limit); + } + + public TrinoTableHandle copy(OptionalLong limit) { + return new TrinoTableHandle( + schemaName, tableName, serializedTable, filter, projectedColumns, limit); + } + + public Table tableWithDynamicOptions(ConnectorSession session) { + // see TrinoConnector.getSessionProperties + Map<String, String> dynamicOptions = new HashMap<>(); + Long scanTimestampMills = TrinoSessionProperties.getScanTimestampMillis(session); + if (scanTimestampMills != null) { + dynamicOptions.put( + CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), scanTimestampMills.toString()); + } + Long scanSnapshotId = TrinoSessionProperties.getScanSnapshotId(session); + if (scanSnapshotId != null) { + dynamicOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), scanSnapshotId.toString()); + } + + return dynamicOptions.size() > 0 ? table().copy(dynamicOptions) : table(); + } + + public Table table() { + if (lazyTable == null) { + try { + lazyTable = + InstantiationUtil.deserializeObject( + serializedTable, this.getClass().getClassLoader()); + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + return lazyTable; + } + + public ConnectorTableMetadata tableMetadata() { + return new ConnectorTableMetadata( + SchemaTableName.schemaTableName(schemaName, tableName), + columnMetadatas(), + Collections.emptyMap(), + Optional.empty()); + } + + public List<ColumnMetadata> columnMetadatas() { + return table().rowType().getFields().stream() + .map( + column -> + ColumnMetadata.builder() + .setName(column.name()) + .setType(TrinoTypeUtils.fromPaimonType(column.type())) + .setNullable(column.type().isNullable()) + .setComment(Optional.ofNullable(column.description())) + .build()) + .collect(Collectors.toList()); + } + + public TrinoColumnHandle columnHandle(String field) { + List<String> fieldNames = FieldNameUtils.fieldNames(table().rowType()); + int index = fieldNames.indexOf(field); + if (index == -1) { + throw new RuntimeException( + String.format("Cannot find field %s in schema %s", field, fieldNames)); + } + return TrinoColumnHandle.of(field, table().rowType().getTypeAt(index)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TrinoTableHandle that = (TrinoTableHandle) o; + return Arrays.equals(serializedTable, that.serializedTable) + && Objects.equals(schemaName, that.schemaName) + && Objects.equals(tableName, that.tableName) + && Objects.equals(filter, that.filter) + && Objects.equals(projectedColumns, that.projectedColumns); + } + + @Override + public int hashCode() { + return Objects.hash( + schemaName, tableName, filter, projectedColumns, Arrays.hashCode(serializedTable)); + } +} diff --git a/paimon-trino-388/src/test/java/org/apache/paimon/trino/TestTrino388ITCase.java b/paimon-trino-388/src/test/java/org/apache/paimon/trino/TestTrino388ITCase.java index decefcf..073b4bd 100644 --- a/paimon-trino-388/src/test/java/org/apache/paimon/trino/TestTrino388ITCase.java +++ b/paimon-trino-388/src/test/java/org/apache/paimon/trino/TestTrino388ITCase.java @@ -18,9 +18,35 @@ package org.apache.paimon.trino; +import org.testng.annotations.Test; + +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + /** {@link TestTrinoITCase} for Trino 388. */ public class TestTrino388ITCase extends TestTrinoITCase { public TestTrino388ITCase() { super(388); } + + @Test + public void testIncrementalRead() { + assertThatExceptionOfType(RuntimeException.class) + .isThrownBy( + () -> + sql( + "SELECT * FROM TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2'))")) + .withMessage( + "Either INCREMENTAL_BETWEEN or INCREMENTAL_BETWEEN_TIMESTAMP must be provided"); + assertThat( + sql( + "SELECT * FROM TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2',incremental_between=>'1,2'))")) + .isEqualTo("[[5, 6, 3, 3], [7, 8, 4, 4]]"); + assertThat( + sql( + String.format( + "SELECT * FROM TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2',incremental_between_timestamp=>'%s,%s'))", + t2FirstCommitTimestamp, System.currentTimeMillis()))) + .isEqualTo("[[5, 6, 3, 3], [7, 8, 4, 4]]"); + } } diff --git a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java index bcde932..b22e3b1 100644 --- a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java +++ b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java @@ -125,7 +125,7 @@ public class TrinoTableOptionUtils { } } - private static String convertOptionKey(String key) { + public static String convertOptionKey(String key) { String regex = "[.\\-]"; Pattern pattern = Pattern.compile(regex); Matcher matcher = pattern.matcher(key);