This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git
The following commit(s) were added to refs/heads/main by this push:
new e3c4d9e Remove timeWithTimeZone test, fix error of upperCase
error(column name aCa will fail to select). (#65)
e3c4d9e is described below
commit e3c4d9e50268529946eb98b91fb5fb5874e7f885
Author: YeJunHao <[email protected]>
AuthorDate: Fri Apr 12 16:56:41 2024 +0800
Remove timeWithTimeZone test, fix error of upperCase error(column name aCa
will fail to select). (#65)
---
.../trino/InternalPaimonConnectorFactory.java | 114 +++++++++++++++++++++
.../org/apache/paimon/trino/TrinoConnector.java | 37 +++----
.../apache/paimon/trino/TrinoConnectorFactory.java | 97 ++++++++----------
.../paimon/trino/TrinoPageSourceProvider.java | 2 +-
.../org/apache/paimon/trino/TrinoTableHandle.java | 10 +-
.../org/apache/paimon/trino/TrinoTypeUtils.java | 21 +++-
.../apache/paimon/trino/catalog/TrinoCatalog.java | 7 +-
.../org/apache/paimon/trino/TestTrinoITCase.java | 4 +-
8 files changed, 201 insertions(+), 91 deletions(-)
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/InternalPaimonConnectorFactory.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/InternalPaimonConnectorFactory.java
new file mode 100644
index 0000000..56232f0
--- /dev/null
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/InternalPaimonConnectorFactory.java
@@ -0,0 +1,114 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.trino;
+
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.json.JsonModule;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.trace.Tracer;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.filesystem.manager.FileSystemModule;
+import io.trino.hdfs.HdfsModule;
+import io.trino.hdfs.authentication.HdfsAuthenticationModule;
+import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata;
+import
io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider;
+import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
+import io.trino.plugin.hive.NodeVersion;
+import io.trino.plugin.hive.orc.OrcReaderConfig;
+import io.trino.spi.classloader.ThreadContextClassLoader;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.type.TypeManager;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Internal factory to create connector. */
+public class InternalPaimonConnectorFactory {
+
+ private InternalPaimonConnectorFactory() {}
+
+ public static Connector createConnector(
+ Map<String, String> config,
+ ConnectorContext context,
+ Optional<TrinoFileSystemFactory> fileSystemFactory,
+ Module module) {
+ ClassLoader classLoader =
InternalPaimonConnectorFactory.class.getClassLoader();
+ try (ThreadContextClassLoader ignored = new
ThreadContextClassLoader(classLoader)) {
+ Bootstrap app =
+ new Bootstrap(
+ new JsonModule(),
+ new TrinoModule(config),
+ new HdfsModule(),
+ new HdfsAuthenticationModule(),
+ // bind the trino file system module
+ fileSystemFactory
+ .map(
+ factory ->
+ (Module)
+ binder ->
+
binder.bind(
+
TrinoFileSystemFactory
+
.class)
+
.toInstance(factory))
+ .orElseGet(FileSystemModule::new),
+ binder -> {
+ binder.bind(NodeVersion.class)
+ .toInstance(
+ new NodeVersion(
+
context.getNodeManager()
+
.getCurrentNode()
+
.getVersion()));
+
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
+ binder.bind(OpenTelemetry.class)
+
.toInstance(context.getOpenTelemetry());
+
binder.bind(Tracer.class).toInstance(context.getTracer());
+ binder.bind(OrcReaderConfig.class)
+ .toInstance(new OrcReaderConfig());
+ },
+ module);
+
+ Injector injector =
+ app.doNotInitializeLogging()
+ .setRequiredConfigurationProperties(Map.of())
+ .setOptionalConfigurationProperties(config)
+ .initialize();
+
+ TrinoMetadata trinoMetadata =
injector.getInstance(TrinoMetadataFactory.class).create();
+ TrinoSplitManager trinoSplitManager =
injector.getInstance(TrinoSplitManager.class);
+ TrinoPageSourceProvider trinoPageSourceProvider =
+ injector.getInstance(TrinoPageSourceProvider.class);
+ TrinoSessionProperties trinoSessionProperties =
+ injector.getInstance(TrinoSessionProperties.class);
+ TrinoTableOptions trinoTableOptions =
injector.getInstance(TrinoTableOptions.class);
+
+ return new TrinoConnector(
+ new ClassLoaderSafeConnectorMetadata(trinoMetadata,
classLoader),
+ new
ClassLoaderSafeConnectorSplitManager(trinoSplitManager, classLoader),
+ new ClassLoaderSafeConnectorPageSourceProvider(
+ trinoPageSourceProvider, classLoader),
+ trinoTableOptions,
+ trinoSessionProperties);
+ }
+ }
+}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnector.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnector.java
index 6bbb64a..9097247 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnector.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnector.java
@@ -20,7 +20,9 @@ package org.apache.paimon.trino;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.transaction.IsolationLevel;
@@ -33,22 +35,22 @@ import static java.util.Objects.requireNonNull;
/** Trino {@link Connector}. */
public class TrinoConnector implements Connector {
- private final TrinoMetadata trinoMetadata;
- private final TrinoSplitManager trinoSplitManager;
- private final TrinoPageSourceProvider trinoPageSourceProvider;
+ private final ConnectorMetadata trinoMetadata;
+ private final ConnectorSplitManager trinoSplitManager;
+ private final ConnectorPageSourceProvider trinoPageSourceProvider;
private final List<PropertyMetadata<?>> tableProperties;
private final List<PropertyMetadata<?>> sessionProperties;
public TrinoConnector(
- TrinoMetadata trinoMetadata,
- TrinoSplitManager trinoSplitManager,
- TrinoPageSourceProvider trinoPageSourceProvider,
+ ConnectorMetadata trinoMetadata,
+ ConnectorSplitManager trinoSplitManager,
+ ConnectorPageSourceProvider trinoPageSourceProvider,
TrinoTableOptions trinoTableOptions,
TrinoSessionProperties trinoSessionProperties) {
- this.trinoMetadata = requireNonNull(trinoMetadata, "jmxMetadata is
null");
- this.trinoSplitManager = requireNonNull(trinoSplitManager,
"jmxSplitManager is null");
+ this.trinoMetadata = requireNonNull(trinoMetadata, "trinoMetadata is
null");
+ this.trinoSplitManager = requireNonNull(trinoSplitManager,
"trinoSplitManager is null");
this.trinoPageSourceProvider =
- requireNonNull(trinoPageSourceProvider, "jmxRecordSetProvider
is null");
+ requireNonNull(trinoPageSourceProvider,
"trinoRecordSetProvider is null");
this.tableProperties = trinoTableOptions.getTableProperties();
this.sessionProperties = trinoSessionProperties.getSessionProperties();
}
@@ -56,32 +58,23 @@ public class TrinoConnector implements Connector {
@Override
public ConnectorTransactionHandle beginTransaction(
IsolationLevel isolationLevel, boolean readOnly, boolean
autoCommit) {
- return beginTransactionBase(isolationLevel, readOnly);
+ checkConnectorSupports(READ_COMMITTED, isolationLevel);
+ return TrinoTransactionHandle.INSTANCE;
}
@Override
public ConnectorMetadata getMetadata(
ConnectorSession session, ConnectorTransactionHandle
transactionHandle) {
- return getMetadataBase(transactionHandle);
- }
-
- protected ConnectorTransactionHandle beginTransactionBase(
- IsolationLevel isolationLevel, boolean readOnly) {
- checkConnectorSupports(READ_COMMITTED, isolationLevel);
- return TrinoTransactionHandle.INSTANCE;
- }
-
- protected TrinoMetadata getMetadataBase(ConnectorTransactionHandle
transactionHandle) {
return trinoMetadata;
}
@Override
- public TrinoSplitManager getSplitManager() {
+ public ConnectorSplitManager getSplitManager() {
return trinoSplitManager;
}
@Override
- public TrinoPageSourceProvider getPageSourceProvider() {
+ public ConnectorPageSourceProvider getPageSourceProvider() {
return trinoPageSourceProvider;
}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
index ea26016..773e573 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
@@ -18,28 +18,31 @@
package org.apache.paimon.trino;
-import com.google.inject.Injector;
+import com.google.inject.Binder;
import com.google.inject.Module;
-import io.airlift.bootstrap.Bootstrap;
-import io.airlift.json.JsonModule;
-import io.opentelemetry.api.OpenTelemetry;
-import io.opentelemetry.api.trace.Tracer;
-import io.trino.filesystem.manager.FileSystemModule;
-import io.trino.hdfs.HdfsModule;
-import io.trino.hdfs.authentication.HdfsAuthenticationModule;
-import io.trino.plugin.hive.NodeVersion;
-import io.trino.plugin.hive.orc.OrcReaderConfig;
-import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorContext;
import io.trino.spi.connector.ConnectorFactory;
-import io.trino.spi.type.TypeManager;
+import java.lang.reflect.InvocationTargetException;
import java.util.Map;
+import java.util.Optional;
+
+import static
org.apache.paimon.shade.guava30.com.google.common.base.Throwables.throwIfUnchecked;
/** Trino {@link ConnectorFactory}. */
public class TrinoConnectorFactory implements ConnectorFactory {
+ private final Class<? extends Module> module;
+
+ public TrinoConnectorFactory() {
+ this(EmptyModule.class);
+ }
+
+ public TrinoConnectorFactory(Class<? extends Module> module) {
+ this.module = module;
+ }
+
@Override
public String getName() {
return "paimon";
@@ -48,53 +51,33 @@ public class TrinoConnectorFactory implements
ConnectorFactory {
@Override
public Connector create(
String catalogName, Map<String, String> config, ConnectorContext
context) {
-
- try (ThreadContextClassLoader ignored =
- new
ThreadContextClassLoader(TrinoConnectorFactory.class.getClassLoader())) {
- Bootstrap app = new Bootstrap(modules(catalogName, config,
context));
-
- Injector injector =
- app.doNotInitializeLogging()
- .setRequiredConfigurationProperties(Map.of())
- .setOptionalConfigurationProperties(config)
- .initialize();
-
- TrinoMetadata trinoMetadata =
injector.getInstance(TrinoMetadataFactory.class).create();
- TrinoSplitManager trinoSplitManager =
injector.getInstance(TrinoSplitManager.class);
- TrinoPageSourceProvider trinoPageSourceProvider =
- injector.getInstance(TrinoPageSourceProvider.class);
- TrinoSessionProperties trinoSessionProperties =
- injector.getInstance(TrinoSessionProperties.class);
- TrinoTableOptions trinoTableOptions =
injector.getInstance(TrinoTableOptions.class);
-
- return new TrinoConnector(
- trinoMetadata,
- trinoSplitManager,
- trinoPageSourceProvider,
- trinoTableOptions,
- trinoSessionProperties);
+ ClassLoader classLoader = context.duplicatePluginClassLoader();
+ try {
+ Class<?> moduleClass =
classLoader.loadClass(Module.class.getName());
+ Object moduleInstance =
+
classLoader.loadClass(module.getName()).getConstructor().newInstance();
+ return (Connector)
+ classLoader
+
.loadClass(InternalPaimonConnectorFactory.class.getName())
+ .getMethod(
+ "createConnector",
+ Map.class,
+ ConnectorContext.class,
+ Optional.class,
+ moduleClass)
+ .invoke(null, config, context, Optional.empty(),
moduleInstance);
+ } catch (InvocationTargetException e) {
+ Throwable targetException = e.getTargetException();
+ throwIfUnchecked(targetException);
+ throw new RuntimeException(targetException);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
}
}
- protected Module[] modules(
- String catalogName, Map<String, String> config, ConnectorContext
context) {
- return new Module[] {
- new JsonModule(),
- new TrinoModule(config),
- new HdfsModule(),
- new HdfsAuthenticationModule(),
- // bind the trino file system module
- new FileSystemModule(),
- binder -> {
- binder.bind(NodeVersion.class)
- .toInstance(
- new NodeVersion(
-
context.getNodeManager().getCurrentNode().getVersion()));
-
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
-
binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry());
- binder.bind(Tracer.class).toInstance(context.getTracer());
- binder.bind(OrcReaderConfig.class).toInstance(new
OrcReaderConfig());
- }
- };
+ /** Empty module for paimon connector factory. */
+ public static class EmptyModule implements Module {
+ @Override
+ public void configure(Binder binder) {}
}
}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
index 5510128..e2244b5 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
@@ -121,7 +121,7 @@ public class TrinoPageSourceProvider implements
ConnectorPageSourceProvider {
List<ColumnHandle> columns,
OptionalLong limit) {
RowType rowType = table.rowType();
- List<String> fieldNames = FieldNameUtils.fieldNames(rowType);
+ List<String> fieldNames = rowType.getFieldNames();
List<String> projectedFields =
columns.stream()
.map(TrinoColumnHandle.class::cast)
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
index 30c03af..bc5d025 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
@@ -164,13 +164,15 @@ public class TrinoTableHandle implements
ConnectorTableHandle {
public TrinoColumnHandle columnHandle(TrinoCatalog catalog, String field) {
Table paimonTable = table(catalog);
- List<String> fieldNames =
FieldNameUtils.fieldNames(paimonTable.rowType());
- int index = fieldNames.indexOf(field);
+ List<String> lowerCaseFieldNames =
FieldNameUtils.fieldNames(paimonTable.rowType());
+ List<String> originFieldNames = paimonTable.rowType().getFieldNames();
+ int index = lowerCaseFieldNames.indexOf(field);
if (index == -1) {
throw new RuntimeException(
- String.format("Cannot find field %s in schema %s", field,
fieldNames));
+ String.format("Cannot find field %s in schema %s", field,
lowerCaseFieldNames));
}
- return TrinoColumnHandle.of(field,
paimonTable.rowType().getTypeAt(index));
+ return TrinoColumnHandle.of(
+ originFieldNames.get(index),
paimonTable.rowType().getTypeAt(index));
}
public TrinoTableHandle copy(TupleDomain<TrinoColumnHandle> filter) {
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTypeUtils.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTypeUtils.java
index 5c7f8d8..da2ddf7 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTypeUtils.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTypeUtils.java
@@ -153,12 +153,29 @@ public class TrinoTypeUtils {
@Override
public Type visit(TimestampType timestampType) {
int precision = timestampType.getPrecision();
- return
io.trino.spi.type.TimestampType.createTimestampType(precision);
+ if (precision <= 3) {
+ return io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
+ } else if (precision <= 6) {
+ return io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
+ } else if (precision <= 9) {
+ return io.trino.spi.type.TimestampType.TIMESTAMP_NANOS;
+ } else {
+ return io.trino.spi.type.TimestampType.TIMESTAMP_PICOS;
+ }
}
@Override
public Type visit(LocalZonedTimestampType localZonedTimestampType) {
- return TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
+ int precision = localZonedTimestampType.getPrecision();
+ if (precision <= 3) {
+ return TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
+ } else if (precision <= 6) {
+ return TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;
+ } else if (precision <= 9) {
+ return TimestampWithTimeZoneType.TIMESTAMP_TZ_NANOS;
+ } else {
+ return TimestampWithTimeZoneType.TIMESTAMP_TZ_PICOS;
+ }
}
@Override
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java
index c956830..e02abb4 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java
@@ -23,7 +23,8 @@ package org.apache.paimon.trino.catalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.options.Options;
@@ -95,7 +96,7 @@ public class TrinoCatalog implements Catalog {
}
@Override
- public Optional<CatalogLock.LockFactory> lockFactory() {
+ public Optional<CatalogLockFactory> lockFactory() {
return current.lockFactory();
}
@@ -173,7 +174,7 @@ public class TrinoCatalog implements Catalog {
}
@Override
- public Optional<CatalogLock.LockContext> lockContext() {
+ public Optional<CatalogLockContext> lockContext() {
return current.lockContext();
}
diff --git
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
index 78fb313..c9ae2f1 100644
---
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
@@ -727,11 +727,11 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
assertThat(
sql(
"SELECT boolean, tinyint,
smallint,int,bigint,float,double,char,varchar, date,timestamp_0, "
- + "timestamp_3, timestamp_6,
timestamp_tz, decimal, to_hex(varbinary), array, map, row FROM
paimon.default.t99"))
+ + "timestamp_3, timestamp_6, decimal,
to_hex(varbinary), array, map, row FROM paimon.default.t99"))
.isEqualTo(
"[[true, 1, 1, 1, 1, 1.0, 1.0, char1, varchar1,
1970-01-01, "
+ "2023-09-12T07:54:48,
2023-09-12T07:54:48.001, 2023-09-12T07:54:48.001001, "
- + "2023-09-12T07:54:48.002Z[UTC], 0.10000,
010203, [1, 1, 1], {1=1}, [1, 1]]]");
+ + "0.10000, 010203, [1, 1, 1], {1=1}, [1,
1]]]");
}
@Test