This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fc27d7a5bd0 [Fix](trino-connector) refactor code of
TrinoConnectorExternalTable (#34496)
fc27d7a5bd0 is described below
commit fc27d7a5bd00566ef0427c4f40d689efc20212c1
Author: Tiewei Fang <[email protected]>
AuthorDate: Fri May 24 11:45:42 2024 +0800
[Fix](trino-connector) refactor code of TrinoConnectorExternalTable (#34496)
1. Fix the issue with the trino-connector accessing DeltaLake:
Cache `ConnectorMetadata` and `ConnectorTransactionHandle`.
2. refactor some code
---
.../datasource/paimon/source/PaimonScanNode.java | 16 ++-----
.../datasource/paimon/source/PaimonSource.java | 9 ++--
.../TrinoConnectorExternalTable.java | 30 ++++++++----
.../trinoconnector/TrinoSchemaCacheValue.java | 54 +++++++++++++++++++---
.../source/TrinoConnectorScanNode.java | 38 ++++++---------
.../source/TrinoConnectorSource.java | 20 ++++----
6 files changed, 100 insertions(+), 67 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 003ced7ead7..3ab5b3ec657 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -20,12 +20,10 @@ package org.apache.doris.datasource.paimon.source;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
-import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
@@ -115,17 +113,9 @@ public class PaimonScanNode extends FileQueryScanNode {
@Override
protected void doInitialize() throws UserException {
- ExternalTable table = (ExternalTable) desc.getTable();
- if (table.isView()) {
- throw new AnalysisException(
- String.format("Querying external view '%s.%s' is not
supported", table.getDbName(),
- table.getName()));
- }
- computeColumnsFilter();
- initBackendPolicy();
- source = new PaimonSource((PaimonExternalTable) table, desc,
columnNameToRange);
+ super.doInitialize();
+ source = new PaimonSource(desc);
Preconditions.checkNotNull(source);
- initSchemaParams();
PaimonPredicateConverter paimonPredicateConverter = new
PaimonPredicateConverter(
source.getPaimonTable().rowType());
predicates = paimonPredicateConverter.convertToPaimonExpr(conjuncts);
@@ -330,7 +320,7 @@ public class PaimonScanNode extends FileQueryScanNode {
@Override
public TableIf getTargetTable() {
- return source.getTargetTable();
+ return desc.getTable();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
index 9ac44537e8a..da948d2b063 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
@@ -23,12 +23,10 @@ import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.datasource.property.constants.PaimonProperties;
-import org.apache.doris.planner.ColumnRange;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.paimon.table.Table;
-import java.util.Map;
public class PaimonSource {
private final PaimonExternalTable paimonExtTable;
@@ -36,11 +34,10 @@ public class PaimonSource {
private final TupleDescriptor desc;
- public PaimonSource(PaimonExternalTable table, TupleDescriptor desc,
- Map<String, ColumnRange> columnNameToRange) {
- this.paimonExtTable = table;
- this.originTable = paimonExtTable.getPaimonTable();
+ public PaimonSource(TupleDescriptor desc) {
this.desc = desc;
+ this.paimonExtTable = (PaimonExternalTable) desc.getTable();
+ this.originTable = paimonExtTable.getPaimonTable();
}
public TupleDescriptor getDesc() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoConnectorExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoConnectorExternalTable.java
index 3cba264861e..27f9b8086a9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoConnectorExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoConnectorExternalTable.java
@@ -143,7 +143,8 @@ public class TrinoConnectorExternalTable extends
ExternalTable {
}
Map<String, ColumnMetadata> columnMetadataMap =
columnMetadataMapBuilder.buildOrThrow();
return Optional.of(
- new TrinoSchemaCacheValue(columns, connectorTableHandle,
columnHandleMap, columnMetadataMap));
+ new TrinoSchemaCacheValue(columns, connectorMetadata,
connectorTableHandle, connectorTransactionHandle,
+ columnHandleMap, columnMetadataMap));
}
@Override
@@ -160,11 +161,7 @@ public class TrinoConnectorExternalTable extends
ExternalTable {
return tTableDescriptor;
}
- protected Type trinoConnectorTypeToDorisType(io.trino.spi.type.Type type) {
- return trinoConnectorPrimitiveTypeToDorisType(type);
- }
-
- private Type trinoConnectorPrimitiveTypeToDorisType(io.trino.spi.type.Type
type) {
+ private Type trinoConnectorTypeToDorisType(io.trino.spi.type.Type type) {
if (type instanceof BooleanType) {
return Type.BOOLEAN;
} else if (type instanceof TinyintType) {
@@ -201,19 +198,19 @@ public class TrinoConnectorExternalTable extends
ExternalTable {
TimestampWithTimeZoneType timestampWithTimeZoneType =
(TimestampWithTimeZoneType) type;
return
ScalarType.createDatetimeV2Type(timestampWithTimeZoneType.getPrecision());
} else if (type instanceof io.trino.spi.type.ArrayType) {
- Type elementType = trinoConnectorPrimitiveTypeToDorisType(
+ Type elementType = trinoConnectorTypeToDorisType(
((io.trino.spi.type.ArrayType) type).getElementType());
return ArrayType.create(elementType, true);
} else if (type instanceof io.trino.spi.type.MapType) {
- Type keyType = trinoConnectorPrimitiveTypeToDorisType(
+ Type keyType = trinoConnectorTypeToDorisType(
((io.trino.spi.type.MapType) type).getKeyType());
- Type valueType = trinoConnectorPrimitiveTypeToDorisType(
+ Type valueType = trinoConnectorTypeToDorisType(
((io.trino.spi.type.MapType) type).getValueType());
return new MapType(keyType, valueType, true, true);
} else if (type instanceof RowType) {
ArrayList<StructField> dorisFields = Lists.newArrayList();
for (Field field : ((RowType) type).getFields()) {
- Type childType =
trinoConnectorPrimitiveTypeToDorisType(field.getType());
+ Type childType =
trinoConnectorTypeToDorisType(field.getType());
if (field.getName().isPresent()) {
dorisFields.add(new StructField(field.getName().get(),
childType));
} else {
@@ -233,6 +230,19 @@ public class TrinoConnectorExternalTable extends
ExternalTable {
.orElse(null);
}
+ public ConnectorMetadata getConnectorMetadata() {
+ makeSureInitialized();
+ Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+ return schemaCacheValue.map(value -> ((TrinoSchemaCacheValue)
value).getConnectorMetadata()).orElse(null);
+ }
+
+ public ConnectorTransactionHandle getConnectorTransactionHandle() {
+ makeSureInitialized();
+ Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+ return schemaCacheValue.map(value -> ((TrinoSchemaCacheValue)
value).getConnectorTransactionHandle())
+ .orElse(null);
+ }
+
public Map<String, ColumnHandle> getColumnHandleMap() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoSchemaCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoSchemaCacheValue.java
index dc629190cbd..43bbe76c3b3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoSchemaCacheValue.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoSchemaCacheValue.java
@@ -22,27 +22,69 @@ import org.apache.doris.datasource.SchemaCacheValue;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorTableHandle;
-import lombok.Getter;
-import lombok.Setter;
+import io.trino.spi.connector.ConnectorTransactionHandle;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-@Getter
-@Setter
public class TrinoSchemaCacheValue extends SchemaCacheValue {
-
+ private ConnectorMetadata connectorMetadata;
private Optional<ConnectorTableHandle> connectorTableHandle;
+ private ConnectorTransactionHandle connectorTransactionHandle;
private Map<String, ColumnHandle> columnHandleMap;
private Map<String, ColumnMetadata> columnMetadataMap;
- public TrinoSchemaCacheValue(List<Column> schema,
Optional<ConnectorTableHandle> connectorTableHandle,
+ public TrinoSchemaCacheValue(List<Column> schema, ConnectorMetadata
connectorMetadata,
+ Optional<ConnectorTableHandle> connectorTableHandle,
ConnectorTransactionHandle connectorTransactionHandle,
Map<String, ColumnHandle> columnHandleMap, Map<String,
ColumnMetadata> columnMetadataMap) {
super(schema);
+ this.connectorMetadata = connectorMetadata;
+ this.connectorTableHandle = connectorTableHandle;
+ this.connectorTransactionHandle = connectorTransactionHandle;
+ this.columnHandleMap = columnHandleMap;
+ this.columnMetadataMap = columnMetadataMap;
+ }
+
+ public ConnectorMetadata getConnectorMetadata() {
+ return connectorMetadata;
+ }
+
+ public Optional<ConnectorTableHandle> getConnectorTableHandle() {
+ return connectorTableHandle;
+ }
+
+ public ConnectorTransactionHandle getConnectorTransactionHandle() {
+ return connectorTransactionHandle;
+ }
+
+ public Map<String, ColumnHandle> getColumnHandleMap() {
+ return columnHandleMap;
+ }
+
+ public Map<String, ColumnMetadata> getColumnMetadataMap() {
+ return columnMetadataMap;
+ }
+
+ public void setConnectorMetadata(ConnectorMetadata connectorMetadata) {
+ this.connectorMetadata = connectorMetadata;
+ }
+
+ public void setConnectorTableHandle(Optional<ConnectorTableHandle>
connectorTableHandle) {
this.connectorTableHandle = connectorTableHandle;
+ }
+
+ public void setConnectorTransactionHandle(ConnectorTransactionHandle
connectorTransactionHandle) {
+ this.connectorTransactionHandle = connectorTransactionHandle;
+ }
+
+ public void setColumnHandleMap(Map<String, ColumnHandle> columnHandleMap) {
this.columnHandleMap = columnHandleMap;
+ }
+
+ public void setColumnMetadataMap(Map<String, ColumnMetadata>
columnMetadataMap) {
this.columnMetadataMap = columnMetadataMap;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
index 9c61d54614a..2b09e30026c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
@@ -27,7 +27,6 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.TableFormatType;
-import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable;
import org.apache.doris.datasource.trinoconnector.TrinoConnectorPluginLoader;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
@@ -66,13 +65,11 @@ import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
-import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.predicate.TupleDomain;
-import io.trino.spi.transaction.IsolationLevel;
import io.trino.spi.type.TypeManager;
import io.trino.split.BufferingSplitSource;
import io.trino.split.ConnectorAwareSplitSource;
@@ -106,17 +103,8 @@ public class TrinoConnectorScanNode extends
FileQueryScanNode {
@Override
protected void doInitialize() throws UserException {
- TrinoConnectorExternalTable table = (TrinoConnectorExternalTable)
desc.getTable();
- if (table.isView()) {
- throw new AnalysisException(
- String.format("Querying external view '%s.%s' is not
supported", table.getDbName(),
- table.getName()));
- }
-
- computeColumnsFilter();
- initBackendPolicy();
- source = new TrinoConnectorSource(desc, table);
- initSchemaParams();
+ super.doInitialize();
+ source = new TrinoConnectorSource(desc);
convertPredicate();
}
@@ -144,12 +132,9 @@ public class TrinoConnectorScanNode extends
FileQueryScanNode {
public List<Split> getSplits() throws UserException {
// 1. Get necessary objects
Connector connector = source.getConnector();
- ConnectorTransactionHandle connectorTransactionHandle =
connector.beginTransaction(
- IsolationLevel.READ_UNCOMMITTED, true, true);
- source.setConnectorTransactionHandle(connectorTransactionHandle);
+ connectorMetadata = source.getConnectorMetadata();
ConnectorSession connectorSession =
source.getTrinoSession().toConnectorSession(source.getCatalogHandle());
- connectorMetadata = connector.getMetadata(connectorSession,
connectorTransactionHandle);
// 2. Begin query
connectorMetadata.beginQuery(connectorSession);
applyPushDown(connectorSession);
@@ -157,7 +142,7 @@ public class TrinoConnectorScanNode extends
FileQueryScanNode {
// 3. get splitSource
List<Split> splits = Lists.newArrayList();
try (SplitSource splitSource = getTrinoSplitSource(connector,
source.getTrinoSession(),
- connectorTransactionHandle,
source.getTrinoConnectorTableHandle(), DynamicFilter.EMPTY)) {
+ source.getTrinoConnectorTableHandle(), DynamicFilter.EMPTY)) {
// 4. get trino.Splits and convert it to doris.Splits
while (!splitSource.isFinished()) {
for (io.trino.metadata.Split split :
getNextSplitBatch(splitSource)) {
@@ -165,6 +150,10 @@ public class TrinoConnectorScanNode extends
FileQueryScanNode {
}
}
}
+
+ // 4. Clear query
+ // It is necessary for hive connector
+ connectorMetadata.cleanupQuery(connectorSession);
return splits;
}
@@ -216,8 +205,7 @@ public class TrinoConnectorScanNode extends
FileQueryScanNode {
// }
}
- private SplitSource getTrinoSplitSource(Connector connector, Session
session,
- ConnectorTransactionHandle connectorTransactionHandle,
ConnectorTableHandle table,
+ private SplitSource getTrinoSplitSource(Connector connector, Session
session, ConnectorTableHandle table,
DynamicFilter dynamicFilter) {
ConnectorSplitManager splitManager = connector.getSplitManager();
@@ -227,8 +215,8 @@ public class TrinoConnectorScanNode extends
FileQueryScanNode {
ConnectorSession connectorSession =
session.toConnectorSession(source.getCatalogHandle());
// Constraint is not used by Hive/BigQuery Connector
- ConnectorSplitSource connectorSplitSource =
splitManager.getSplits(connectorTransactionHandle, connectorSession,
- table, dynamicFilter, constraint);
+ ConnectorSplitSource connectorSplitSource =
splitManager.getSplits(source.getConnectorTransactionHandle(),
+ connectorSession, table, dynamicFilter, constraint);
SplitSource splitSource = new
ConnectorAwareSplitSource(source.getCatalogHandle(), connectorSplitSource);
if (this.minScheduleSplitBatchSize > 1) {
@@ -386,7 +374,9 @@ public class TrinoConnectorScanNode extends
FileQueryScanNode {
@Override
public TableIf getTargetTable() {
- return source.getTargetTable();
+ // can not use `source.getTargetTable()`
+ // because source is null when called getTargetTable
+ return desc.getTable();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorSource.java
index 85e36517bae..20dcf996595 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorSource.java
@@ -27,6 +27,7 @@ import io.trino.Session;
import io.trino.connector.ConnectorName;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
@@ -40,16 +41,19 @@ public class TrinoConnectorSource {
private final ConnectorName connectorName;
private ConnectorTransactionHandle connectorTransactionHandle;
private ConnectorTableHandle trinoConnectorTableHandle;
+ private ConnectorMetadata connectorMetadata;
- public TrinoConnectorSource(TupleDescriptor desc,
TrinoConnectorExternalTable table) {
+ public TrinoConnectorSource(TupleDescriptor desc) {
this.desc = desc;
- this.trinoConnectorExtTable = table;
- this.trinoConnectorExternalCatalog = (TrinoConnectorExternalCatalog)
table.getCatalog();
+ this.trinoConnectorExtTable = (TrinoConnectorExternalTable)
desc.getTable();
+ this.trinoConnectorExternalCatalog = (TrinoConnectorExternalCatalog)
trinoConnectorExtTable.getCatalog();
this.catalogHandle =
trinoConnectorExternalCatalog.getTrinoCatalogHandle();
- this.trinoConnectorTableHandle = table.getConnectorTableHandle();
+ this.trinoConnectorTableHandle =
trinoConnectorExtTable.getConnectorTableHandle();
+ this.connectorMetadata = trinoConnectorExtTable.getConnectorMetadata();
+ this.connectorTransactionHandle =
trinoConnectorExtTable.getConnectorTransactionHandle();
this.trinoSession = trinoConnectorExternalCatalog.getTrinoSession();
- this.connector = ((TrinoConnectorExternalCatalog)
table.getCatalog()).getConnector();
- this.connectorName = ((TrinoConnectorExternalCatalog)
table.getCatalog()).getConnectorName();
+ this.connector = trinoConnectorExternalCatalog.getConnector();
+ this.connectorName = trinoConnectorExternalCatalog.getConnectorName();
}
public TupleDescriptor getDesc() {
@@ -88,8 +92,8 @@ public class TrinoConnectorSource {
return connectorName;
}
- public void setConnectorTransactionHandle(ConnectorTransactionHandle
connectorTransactionHandle) {
- this.connectorTransactionHandle = connectorTransactionHandle;
+ public ConnectorMetadata getConnectorMetadata() {
+ return connectorMetadata;
}
public void setTrinoConnectorTableHandle(ConnectorTableHandle
trinoConnectorExtTableHandle) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]