This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 9fbc89bbad40ec49b0dbd9b6bb5ea9eb4b53e1f9
Author: Tiewei Fang <[email protected]>
AuthorDate: Thu Jul 25 12:28:29 2024 +0800

    [enhencement](trino-connector) trino-connector supports push down 
projection to connectors (#37874)
    
    Invoke the `applyProjection` method of connectorMetadata` to push the
    projection down to the connector. This reduces the amount of data
    retrieved by the connector and enhances query performance.
    
    Projection pushdown is particularly important for the BigQuery
    connector.
---
 .../source/TrinoConnectorScanNode.java             | 43 ++++++++++------------
 1 file changed, 20 insertions(+), 23 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
index 7afe8eed8c8..2f0085ce7fd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
@@ -41,6 +41,7 @@ import org.apache.doris.trinoconnector.TrinoColumnMetadata;
 import com.fasterxml.jackson.databind.Module;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import io.airlift.concurrent.BoundedExecutor;
 import io.airlift.concurrent.MoreFutures;
 import io.airlift.concurrent.Threads;
@@ -66,6 +67,9 @@ import io.trino.spi.connector.Constraint;
 import io.trino.spi.connector.ConstraintApplicationResult;
 import io.trino.spi.connector.DynamicFilter;
 import io.trino.spi.connector.LimitApplicationResult;
+import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.expression.ConnectorExpression;
+import io.trino.spi.expression.Variable;
 import io.trino.spi.predicate.TupleDomain;
 import io.trino.spi.type.TypeManager;
 import io.trino.split.BufferingSplitSource;
@@ -177,29 +181,22 @@ public class TrinoConnectorScanNode extends 
FileQueryScanNode {
                     + " after pushing down.");
         }
 
-        // TODO(ftw): push down projection
-        // Map<String, ColumnHandle> columnHandleMap = 
source.getTargetTable().getColumnHandleMap();
-        // Map<String, ColumnHandle> assignments = Maps.newHashMap();
-        // if (source.getTargetTable().getName().equals("customer")) {
-        //     assignments.put("c_custkey", columnHandleMap.get("c_custkey"));
-        //     assignments.put("c_mktsegment", 
columnHandleMap.get("c_mktsegment"));
-        // } else if (source.getTargetTable().getName().equals("orders")) {
-        //     assignments.put("o_orderkey", 
columnHandleMap.get("o_orderkey"));
-        //     assignments.put("o_custkey", columnHandleMap.get("o_custkey"));
-        //     assignments.put("o_orderdate", 
columnHandleMap.get("o_orderdate"));
-        //     assignments.put("o_shippriority", 
columnHandleMap.get("o_shippriority"));
-        // } else if (source.getTargetTable().getName().equals("lineitem")) {
-        //     assignments.put("l_orderkey", 
columnHandleMap.get("l_orderkey"));
-        //     assignments.put("l_extendedprice", 
columnHandleMap.get("l_extendedprice"));
-        //     assignments.put("l_discount", 
columnHandleMap.get("l_discount"));
-        //     assignments.put("l_shipdate", 
columnHandleMap.get("l_shipdate"));
-        // }
-        // Optional<ProjectionApplicationResult<ConnectorTableHandle>> 
projectionResult
-        //         = connectorMetadata.applyProjection(connectorSession, 
source.getTrinoConnectorTableHandle(),
-        //         Lists.newArrayList(), assignments);
-        // if (projectionResult.isPresent()) {
-        //     
source.setTrinoConnectorTableHandle(projectionResult.get().getHandle());
-        // }
+        // push down projection
+        Map<String, ColumnHandle> columnHandleMap = 
source.getTargetTable().getColumnHandleMap();
+        Map<String, ColumnMetadata> columnMetadataMap = 
source.getTargetTable().getColumnMetadataMap();
+        Map<String, ColumnHandle> assignments = Maps.newLinkedHashMap();
+        List<ConnectorExpression> projections = Lists.newArrayList();
+        for (SlotDescriptor slotDescriptor : desc.getSlots()) {
+            String colName = slotDescriptor.getColumn().getName();
+            assignments.put(colName, columnHandleMap.get(colName));
+            projections.add(new Variable(colName, 
columnMetadataMap.get(colName).getType()));
+        }
+        Optional<ProjectionApplicationResult<ConnectorTableHandle>> 
projectionResult
+                = connectorMetadata.applyProjection(connectorSession, 
source.getTrinoConnectorTableHandle(),
+                projections, assignments);
+        if (projectionResult.isPresent()) {
+            
source.setTrinoConnectorTableHandle(projectionResult.get().getHandle());
+        }
     }
 
     private SplitSource getTrinoSplitSource(Connector connector, Session 
session, ConnectorTableHandle table,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to