This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 9fbc89bbad40ec49b0dbd9b6bb5ea9eb4b53e1f9 Author: Tiewei Fang <[email protected]> AuthorDate: Thu Jul 25 12:28:29 2024 +0800 [enhencement](trino-connector) trino-connector supports push down projection to connectors (#37874) Invoke the `applyProjection` method of connectorMetadata` to push the projection down to the connector. This reduces the amount of data retrieved by the connector and enhances query performance. Projection pushdown is particularly important for the BigQuery connector. --- .../source/TrinoConnectorScanNode.java | 43 ++++++++++------------ 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java index 7afe8eed8c8..2f0085ce7fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java @@ -41,6 +41,7 @@ import org.apache.doris.trinoconnector.TrinoColumnMetadata; import com.fasterxml.jackson.databind.Module; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import io.airlift.concurrent.BoundedExecutor; import io.airlift.concurrent.MoreFutures; import io.airlift.concurrent.Threads; @@ -66,6 +67,9 @@ import io.trino.spi.connector.Constraint; import io.trino.spi.connector.ConstraintApplicationResult; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.LimitApplicationResult; +import io.trino.spi.connector.ProjectionApplicationResult; +import io.trino.spi.expression.ConnectorExpression; +import io.trino.spi.expression.Variable; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.TypeManager; import io.trino.split.BufferingSplitSource; @@ -177,29 +181,22 @@ public class TrinoConnectorScanNode extends FileQueryScanNode { + " after pushing down."); } - // TODO(ftw): push down projection - // Map<String, ColumnHandle> columnHandleMap = source.getTargetTable().getColumnHandleMap(); - // Map<String, ColumnHandle> assignments = Maps.newHashMap(); - // if (source.getTargetTable().getName().equals("customer")) { - // assignments.put("c_custkey", columnHandleMap.get("c_custkey")); - // assignments.put("c_mktsegment", columnHandleMap.get("c_mktsegment")); - // } else if (source.getTargetTable().getName().equals("orders")) { - // assignments.put("o_orderkey", columnHandleMap.get("o_orderkey")); - // assignments.put("o_custkey", columnHandleMap.get("o_custkey")); - // assignments.put("o_orderdate", columnHandleMap.get("o_orderdate")); - // assignments.put("o_shippriority", columnHandleMap.get("o_shippriority")); - // } else if (source.getTargetTable().getName().equals("lineitem")) { - // assignments.put("l_orderkey", columnHandleMap.get("l_orderkey")); - // assignments.put("l_extendedprice", columnHandleMap.get("l_extendedprice")); - // assignments.put("l_discount", columnHandleMap.get("l_discount")); - // assignments.put("l_shipdate", columnHandleMap.get("l_shipdate")); - // } - // Optional<ProjectionApplicationResult<ConnectorTableHandle>> projectionResult - // = connectorMetadata.applyProjection(connectorSession, source.getTrinoConnectorTableHandle(), - // Lists.newArrayList(), assignments); - // if (projectionResult.isPresent()) { - // source.setTrinoConnectorTableHandle(projectionResult.get().getHandle()); - // } + // push down projection + Map<String, ColumnHandle> columnHandleMap = source.getTargetTable().getColumnHandleMap(); + Map<String, ColumnMetadata> columnMetadataMap = source.getTargetTable().getColumnMetadataMap(); + Map<String, ColumnHandle> assignments = Maps.newLinkedHashMap(); + List<ConnectorExpression> projections = Lists.newArrayList(); + for (SlotDescriptor slotDescriptor : desc.getSlots()) { + String colName = slotDescriptor.getColumn().getName(); + assignments.put(colName, columnHandleMap.get(colName)); + projections.add(new Variable(colName, columnMetadataMap.get(colName).getType())); + } + Optional<ProjectionApplicationResult<ConnectorTableHandle>> projectionResult + = connectorMetadata.applyProjection(connectorSession, source.getTrinoConnectorTableHandle(), + projections, assignments); + if (projectionResult.isPresent()) { + source.setTrinoConnectorTableHandle(projectionResult.get().getHandle()); + } } private SplitSource getTrinoSplitSource(Connector connector, Session session, ConnectorTableHandle table, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
