This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-paimon-presto.git
The following commit(s) were added to refs/heads/main by this push:
new 304ec21 add presto filter pushdown (#18)
304ec21 is described below
commit 304ec2168f566fabac5b2432be9c9d74ee21ad48
Author: chendapao <[email protected]>
AuthorDate: Mon Nov 27 11:13:18 2023 +0800
add presto filter pushdown (#18)
---
.../org/apache/paimon/presto/PrestoConnector.java | 1 +
.../org/apache/paimon/presto/PrestoConnector.java | 12 +-
paimon-presto-common/pom.xml | 8 +-
.../org/apache/paimon/presto/PaimonModule.java | 1 +
.../paimon/presto/PrestoComputePushdown.java | 202 +++++++++++++++++++++
.../org/apache/paimon/presto/PrestoConnector.java | 12 +-
.../apache/paimon/presto/PrestoConnectorBase.java | 25 +++
.../paimon/presto/PrestoPlanOptimizerProvider.java | 57 ++++++
.../paimon/presto/TestPrestoComputePushdown.java | 200 ++++++++++++++++++++
9 files changed, 512 insertions(+), 6 deletions(-)
diff --git
a/paimon-presto-0.236/src/main/java/org/apache/paimon/presto/PrestoConnector.java
b/paimon-presto-0.236/src/main/java/org/apache/paimon/presto/PrestoConnector.java
index 8567cbb..51d8f67 100644
---
a/paimon-presto-0.236/src/main/java/org/apache/paimon/presto/PrestoConnector.java
+++
b/paimon-presto-0.236/src/main/java/org/apache/paimon/presto/PrestoConnector.java
@@ -34,6 +34,7 @@ public class PrestoConnector extends PrestoConnectorBase {
PrestoSplitManager prestoSplitManager,
PrestoPageSourceProvider prestoPageSourceProvider,
PrestoMetadata prestoMetadata) {
+ // Presto 236 compute push-down api is too low, not yet impl.
super(transactionManager, prestoSplitManager,
prestoPageSourceProvider, prestoMetadata);
this.transactionManager = transactionManager;
}
diff --git
a/paimon-presto-0.268/src/main/java/org/apache/paimon/presto/PrestoConnector.java
b/paimon-presto-0.268/src/main/java/org/apache/paimon/presto/PrestoConnector.java
index 8567cbb..0fa0f33 100644
---
a/paimon-presto-0.268/src/main/java/org/apache/paimon/presto/PrestoConnector.java
+++
b/paimon-presto-0.268/src/main/java/org/apache/paimon/presto/PrestoConnector.java
@@ -23,6 +23,8 @@ import
com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import javax.inject.Inject;
+import java.util.Optional;
+
/** Presto {@link Connector}. */
public class PrestoConnector extends PrestoConnectorBase {
@@ -33,8 +35,14 @@ public class PrestoConnector extends PrestoConnectorBase {
PrestoTransactionManager transactionManager,
PrestoSplitManager prestoSplitManager,
PrestoPageSourceProvider prestoPageSourceProvider,
- PrestoMetadata prestoMetadata) {
- super(transactionManager, prestoSplitManager,
prestoPageSourceProvider, prestoMetadata);
+ PrestoMetadata prestoMetadata,
+ PrestoPlanOptimizerProvider prestoPlanOptimizerProvider) {
+ super(
+ transactionManager,
+ prestoSplitManager,
+ prestoPageSourceProvider,
+ prestoMetadata,
+ Optional.of(prestoPlanOptimizerProvider));
this.transactionManager = transactionManager;
}
diff --git a/paimon-presto-common/pom.xml b/paimon-presto-common/pom.xml
index d4d8cd5..b84fa81 100644
--- a/paimon-presto-common/pom.xml
+++ b/paimon-presto-common/pom.xml
@@ -57,6 +57,12 @@ under the License.
<version>${hive.apache.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.facebook.presto</groupId>
+ <artifactId>presto-hive</artifactId>
+ <version>${presto.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
@@ -88,14 +94,12 @@ under the License.
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<version>${presto.version}</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-common</artifactId>
<version>${presto.version}</version>
- <scope>provided</scope>
</dependency>
<dependency>
diff --git
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PaimonModule.java
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PaimonModule.java
index 5dee0a8..6cbb738 100644
---
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PaimonModule.java
+++
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PaimonModule.java
@@ -71,6 +71,7 @@ public class PaimonModule implements Module {
binder.bind(RowExpressionService.class).toInstance(rowExpressionService);
binder.bind(Options.class).toInstance(Options.fromMap(config));
binder.bind(PrestoTransactionManager.class).in(Scopes.SINGLETON);
+ binder.bind(PrestoPlanOptimizerProvider.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(PaimonConfig.class);
}
diff --git
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
new file mode 100644
index 0000000..a2da50c
--- /dev/null
+++
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.presto;
+
+import com.facebook.presto.common.Subfield;
+import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.hive.SubfieldExtractor;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorPlanOptimizer;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.TableHandle;
+import com.facebook.presto.spi.VariableAllocator;
+import com.facebook.presto.spi.function.StandardFunctionResolution;
+import com.facebook.presto.spi.plan.FilterNode;
+import com.facebook.presto.spi.plan.PlanNode;
+import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
+import com.facebook.presto.spi.plan.PlanVisitor;
+import com.facebook.presto.spi.plan.TableScanNode;
+import com.facebook.presto.spi.relation.CallExpression;
+import com.facebook.presto.spi.relation.DomainTranslator;
+import com.facebook.presto.spi.relation.RowExpression;
+import com.facebook.presto.spi.relation.RowExpressionService;
+import com.facebook.presto.spi.relation.VariableReferenceExpression;
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/** PrestoComputePushdown. */
+public class PrestoComputePushdown implements ConnectorPlanOptimizer {
+
+ private final StandardFunctionResolution functionResolution;
+ private final RowExpressionService rowExpressionService;
+
+ public PrestoComputePushdown(
+ StandardFunctionResolution functionResolution,
+ RowExpressionService rowExpressionService) {
+
+ this.functionResolution = requireNonNull(functionResolution,
"functionResolution is null");
+ this.rowExpressionService =
+ requireNonNull(rowExpressionService, "rowExpressionService is
null");
+ }
+
+ @Override
+ public PlanNode optimize(
+ PlanNode maxSubplan,
+ ConnectorSession session,
+ VariableAllocator variableAllocator,
+ PlanNodeIdAllocator idAllocator) {
+
+ return maxSubplan.accept(new Visitor(session, idAllocator), null);
+ }
+
+ private class Visitor extends PlanVisitor<PlanNode, Void> {
+
+ private final ConnectorSession session;
+ private final PlanNodeIdAllocator idAllocator;
+
+ public Visitor(ConnectorSession session, PlanNodeIdAllocator
idAllocator) {
+ this.session = requireNonNull(session, "session is null");
+ this.idAllocator = requireNonNull(idAllocator, "idAllocator is
null");
+ }
+
+ private Optional<List<ColumnHandle>> extractColumns(
+ RowExpression expression,
+ Map<VariableReferenceExpression, ColumnHandle> assignments) {
+
+ if (expression instanceof VariableReferenceExpression) {
+ VariableReferenceExpression variable =
(VariableReferenceExpression) expression;
+ ColumnHandle columnHandle = assignments.get(variable);
+ if (columnHandle != null) {
+ return
Optional.of(Collections.singletonList(columnHandle));
+ }
+ return Optional.empty();
+ } else if (expression instanceof CallExpression) {
+ CallExpression callExpression = (CallExpression) expression;
+ List<ColumnHandle> columns = new ArrayList<>();
+ for (RowExpression argument : callExpression.getArguments()) {
+ extractColumns(argument,
assignments).ifPresent(columns::addAll);
+ }
+ return Optional.of(columns);
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public PlanNode visitPlan(PlanNode node, Void context) {
+ ImmutableList.Builder<PlanNode> children = ImmutableList.builder();
+ boolean changed = false;
+ for (PlanNode child : node.getSources()) {
+ PlanNode newChild = child.accept(this, null);
+ if (newChild != child) {
+ changed = true;
+ }
+ children.add(newChild);
+ }
+ if (!changed) {
+ return node;
+ }
+ return node.replaceChildren(children.build());
+ }
+
+ @Override
+ public PlanNode visitFilter(FilterNode filter, Void context) {
+ if (!(filter.getSource() instanceof TableScanNode)) {
+ return filter;
+ }
+
+ TableScanNode tableScan = (TableScanNode) filter.getSource();
+
+ Map<String, PrestoColumnHandle> nameToColumnHandlesMapping =
+ tableScan.getAssignments().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ e -> e.getKey().getName(),
+ e -> (PrestoColumnHandle)
e.getValue()));
+
+ RowExpression filterPredicate = filter.getPredicate();
+
+ DomainTranslator.ExtractionResult<Subfield> decomposedFilter =
+ rowExpressionService
+ .getDomainTranslator()
+ .fromPredicate(
+ session,
+ filterPredicate,
+ new SubfieldExtractor(
+ functionResolution,
+
rowExpressionService.getExpressionOptimizer(),
+ session)
+ .toColumnExtractor());
+
+ // Build paimon predicate presto column.
+ TupleDomain<PrestoColumnHandle> entireColumnDomain =
+ decomposedFilter
+ .getTupleDomain()
+ .transform(
+ subfield ->
+ subfield.getPath().isEmpty()
+ ? subfield.getRootName()
+ : null)
+ .transform(nameToColumnHandlesMapping::get);
+
+ // Build paimon predicate column list.
+ Map<VariableReferenceExpression, ColumnHandle> assignments =
tableScan.getAssignments();
+ Optional<List<ColumnHandle>> projectedColumns =
+ extractColumns(filterPredicate, assignments);
+
+ // Build paimon new presto table handle use pushdown.
+ PrestoTableHandle oldPrestoTableHandle =
+ (PrestoTableHandle)
tableScan.getTable().getConnectorHandle();
+ PrestoTableHandle newPrestoTableHandle =
+ new PrestoTableHandle(
+ oldPrestoTableHandle.getSchemaName(),
+ oldPrestoTableHandle.getTableName(),
+ oldPrestoTableHandle.getSerializedTable(),
+ entireColumnDomain,
+ projectedColumns);
+
+ PrestoTableLayoutHandle newLayoutHandle =
+ new PrestoTableLayoutHandle(
+ newPrestoTableHandle,
tableScan.getCurrentConstraint());
+ TableScanNode newTableScan =
+ new TableScanNode(
+ tableScan.getSourceLocation(),
+ tableScan.getId(),
+ new TableHandle(
+ tableScan.getTable().getConnectorId(),
+ newPrestoTableHandle,
+ tableScan.getTable().getTransaction(),
+ Optional.of(newLayoutHandle)),
+ tableScan.getOutputVariables(),
+ tableScan.getAssignments(),
+ tableScan.getCurrentConstraint(),
+ tableScan.getEnforcedConstraint());
+
+ return new FilterNode(
+ filter.getSourceLocation(), filter.getId(), newTableScan,
filterPredicate);
+ }
+ }
+}
diff --git
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnector.java
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnector.java
index a9d64c9..cbaf0fb 100644
---
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnector.java
+++
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnector.java
@@ -25,6 +25,8 @@ import
com.facebook.presto.spi.connector.EmptyConnectorCommitHandle;
import javax.inject.Inject;
+import java.util.Optional;
+
/** Presto {@link Connector}. */
public class PrestoConnector extends PrestoConnectorBase {
@@ -35,8 +37,14 @@ public class PrestoConnector extends PrestoConnectorBase {
PrestoTransactionManager transactionManager,
PrestoSplitManager prestoSplitManager,
PrestoPageSourceProvider prestoPageSourceProvider,
- PrestoMetadata prestoMetadata) {
- super(transactionManager, prestoSplitManager,
prestoPageSourceProvider, prestoMetadata);
+ PrestoMetadata prestoMetadata,
+ PrestoPlanOptimizerProvider prestoPlanOptimizerProvider) {
+ super(
+ transactionManager,
+ prestoSplitManager,
+ prestoPageSourceProvider,
+ prestoMetadata,
+ Optional.of(prestoPlanOptimizerProvider));
this.transactionManager = transactionManager;
}
diff --git
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnectorBase.java
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnectorBase.java
index 8d840a4..0eab77c 100644
---
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnectorBase.java
+++
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnectorBase.java
@@ -22,11 +22,14 @@ import
com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import
com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
import com.facebook.presto.spi.transaction.IsolationLevel;
+import java.util.Optional;
+
import static
com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
import static
com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;
@@ -38,17 +41,34 @@ public abstract class PrestoConnectorBase implements
Connector {
private final PrestoSplitManager prestoSplitManager;
private final PrestoPageSourceProvider prestoPageSourceProvider;
private final PrestoMetadata prestoMetadata;
+ private final Optional<PrestoPlanOptimizerProvider>
prestoPlanOptimizerProvider;
public PrestoConnectorBase(
PrestoTransactionManager transactionManager,
PrestoSplitManager prestoSplitManager,
PrestoPageSourceProvider prestoPageSourceProvider,
PrestoMetadata prestoMetadata) {
+ this(
+ transactionManager,
+ prestoSplitManager,
+ prestoPageSourceProvider,
+ prestoMetadata,
+ Optional.empty());
+ }
+
+ public PrestoConnectorBase(
+ PrestoTransactionManager transactionManager,
+ PrestoSplitManager prestoSplitManager,
+ PrestoPageSourceProvider prestoPageSourceProvider,
+ PrestoMetadata prestoMetadata,
+ Optional<PrestoPlanOptimizerProvider> prestoPlanOptimizerProvider)
{
this.transactionManager = requireNonNull(transactionManager,
"transactionManager is null");
this.prestoSplitManager = requireNonNull(prestoSplitManager,
"prestoSplitManager is null");
this.prestoPageSourceProvider =
requireNonNull(prestoPageSourceProvider,
"prestoPageSourceProvider is null");
this.prestoMetadata = requireNonNull(prestoMetadata, "prestoMetadata
is null");
+ this.prestoPlanOptimizerProvider =
+ requireNonNull(prestoPlanOptimizerProvider,
"prestoPlanOptimizerProvider is null");
}
@Override
@@ -83,4 +103,9 @@ public abstract class PrestoConnectorBase implements
Connector {
public void rollback(ConnectorTransactionHandle transaction) {
transactionManager.remove(transaction);
}
+
+ @Override
+ public ConnectorPlanOptimizerProvider getConnectorPlanOptimizerProvider() {
+ return
prestoPlanOptimizerProvider.orElseThrow(UnsupportedOperationException::new);
+ }
}
diff --git
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPlanOptimizerProvider.java
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPlanOptimizerProvider.java
new file mode 100644
index 0000000..7092753
--- /dev/null
+++
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPlanOptimizerProvider.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.presto;
+
+import com.facebook.presto.spi.ConnectorPlanOptimizer;
+import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
+import com.facebook.presto.spi.function.StandardFunctionResolution;
+import com.facebook.presto.spi.relation.RowExpressionService;
+import com.google.common.collect.ImmutableSet;
+
+import javax.inject.Inject;
+
+import java.util.Set;
+
+import static java.util.Objects.requireNonNull;
+
+/** PrestoPlanOptimizerProvider. */
+public class PrestoPlanOptimizerProvider implements
ConnectorPlanOptimizerProvider {
+
+ private final StandardFunctionResolution functionResolution;
+ private final RowExpressionService rowExpressionService;
+
+ @Inject
+ public PrestoPlanOptimizerProvider(
+ StandardFunctionResolution functionResolution,
+ RowExpressionService rowExpressionService) {
+ this.functionResolution = requireNonNull(functionResolution,
"functionResolution is null");
+ this.rowExpressionService =
+ requireNonNull(rowExpressionService, "rowExpressionService is
null");
+ }
+
+ @Override
+ public Set<ConnectorPlanOptimizer> getLogicalPlanOptimizers() {
+ return ImmutableSet.of(new PrestoComputePushdown(functionResolution,
rowExpressionService));
+ }
+
+ @Override
+ public Set<ConnectorPlanOptimizer> getPhysicalPlanOptimizers() {
+ return ImmutableSet.of();
+ }
+}
diff --git
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
new file mode 100644
index 0000000..3151ce0
--- /dev/null
+++
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.presto;
+
+import org.apache.paimon.types.BigIntType;
+
+import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.metadata.FunctionAndTypeManager;
+import com.facebook.presto.metadata.MetadataManager;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorId;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.TableHandle;
+import com.facebook.presto.spi.function.StandardFunctionResolution;
+import com.facebook.presto.spi.plan.FilterNode;
+import com.facebook.presto.spi.plan.PlanNode;
+import com.facebook.presto.spi.plan.PlanNodeId;
+import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
+import com.facebook.presto.spi.plan.TableScanNode;
+import com.facebook.presto.spi.relation.DeterminismEvaluator;
+import com.facebook.presto.spi.relation.DomainTranslator;
+import com.facebook.presto.spi.relation.ExpressionOptimizer;
+import com.facebook.presto.spi.relation.PredicateCompiler;
+import com.facebook.presto.spi.relation.RowExpression;
+import com.facebook.presto.spi.relation.RowExpressionService;
+import com.facebook.presto.spi.relation.VariableReferenceExpression;
+import com.facebook.presto.sql.TestingRowExpressionTranslator;
+import com.facebook.presto.sql.gen.RowExpressionPredicateCompiler;
+import com.facebook.presto.sql.planner.PlanVariableAllocator;
+import com.facebook.presto.sql.planner.TypeProvider;
+import com.facebook.presto.sql.planner.planPrinter.RowExpressionFormatter;
+import com.facebook.presto.sql.relational.FunctionResolution;
+import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator;
+import com.facebook.presto.sql.relational.RowExpressionDomainTranslator;
+import com.facebook.presto.sql.relational.RowExpressionOptimizer;
+import com.facebook.presto.testing.TestingConnectorSession;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import static com.facebook.presto.common.type.BigintType.BIGINT;
+import static
com.facebook.presto.metadata.FunctionAndTypeManager.createTestFunctionAndTypeManager;
+import static
com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder.expression;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link TestPrestoComputePushdown}. */
+public class TestPrestoComputePushdown {
+
+ public static final MetadataManager METADATA =
MetadataManager.createTestMetadataManager();
+
+ public static final StandardFunctionResolution FUNCTION_RESOLUTION =
+ new
FunctionResolution(FunctionAndTypeManager.createTestFunctionAndTypeManager());
+
+ public static final RowExpressionService ROW_EXPRESSION_SERVICE =
+ new RowExpressionService() {
+ @Override
+ public DomainTranslator getDomainTranslator() {
+ return new RowExpressionDomainTranslator(METADATA);
+ }
+
+ @Override
+ public ExpressionOptimizer getExpressionOptimizer() {
+ return new RowExpressionOptimizer(METADATA);
+ }
+
+ @Override
+ public PredicateCompiler getPredicateCompiler() {
+ return new RowExpressionPredicateCompiler(METADATA);
+ }
+
+ @Override
+ public DeterminismEvaluator getDeterminismEvaluator() {
+ return new RowExpressionDeterminismEvaluator(METADATA);
+ }
+
+ @Override
+ public String formatRowExpression(
+ ConnectorSession session, RowExpression expression) {
+ return new
RowExpressionFormatter(METADATA.getFunctionAndTypeManager())
+ .formatRowExpression(session, expression);
+ }
+ };
+
+ private TableScanNode createTableScan() {
+ PlanVariableAllocator variableAllocator = new PlanVariableAllocator();
+ VariableReferenceExpression variableA =
variableAllocator.newVariable("a", BIGINT);
+
+ Map<VariableReferenceExpression, ColumnHandle> assignments =
+ ImmutableMap.<VariableReferenceExpression,
ColumnHandle>builder()
+ .put(
+ variableA,
+ PrestoColumnHandle.create(
+ "id", new BigIntType(),
createTestFunctionAndTypeManager()))
+ .build();
+
+ PrestoTableHandle tableHandle = new PrestoTableHandle("test", "test",
"table".getBytes());
+
+ return new TableScanNode(
+ Optional.empty(),
+ new PlanNodeId(UUID.randomUUID().toString()),
+ new TableHandle(
+ new ConnectorId("paimon"),
+ tableHandle,
+ new PrestoTransactionHandle(),
+ Optional.of(
+ new PrestoTableLayoutHandle(
+ new PrestoTableHandle(
+ "test",
+ "test",
+ "table".getBytes(),
+ TupleDomain.all(),
+ Optional.empty()),
+ TupleDomain.all()))),
+ ImmutableList.copyOf(assignments.keySet()),
+ assignments,
+ TupleDomain.all(),
+ TupleDomain.all());
+ }
+
+ private PlanNode createFilterNode() {
+ String expression = "a = 1";
+ TypeProvider typeProvider = TypeProvider.copyOf(ImmutableMap.of("a",
BIGINT));
+ RowExpression rowExpression =
+ new TestingRowExpressionTranslator(METADATA)
+ .translateAndOptimize(expression(expression),
typeProvider);
+
+ return new FilterNode(
+ Optional.empty(),
+ new PlanNodeId(UUID.randomUUID().toString()),
+ createTableScan(),
+ rowExpression);
+ }
+
+ @Test
+ public void testOptimizeFilter() {
+ // Mock data.
+ PrestoColumnHandle testData = new PrestoColumnHandle("id", "BIGINT",
BIGINT);
+
+ PrestoComputePushdown prestoComputePushdown =
+ new PrestoComputePushdown(FUNCTION_RESOLUTION,
ROW_EXPRESSION_SERVICE);
+
+ PlanNode mockInputPlan = createFilterNode();
+ ConnectorSession session = new
TestingConnectorSession(ImmutableList.of());
+ PlanVariableAllocator variableAllocator = new PlanVariableAllocator();
+ PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();
+
+ // Call optimize
+ PlanNode result =
+ prestoComputePushdown.optimize(
+ mockInputPlan, session, variableAllocator,
idAllocator);
+
+ // Optimize result convert.
+ TableScanNode source = (TableScanNode) ((FilterNode)
result).getSource();
+ TableHandle table = source.getTable();
+ PrestoTableLayoutHandle prestoTableLayoutHandle =
+ (PrestoTableLayoutHandle)
+ table.getLayout()
+ .orElseThrow(
+ () -> new
IllegalStateException("Layout is not present"));
+
+ // Assert whether there are any modifications after optimization
optionalFilter.
+ Optional<TupleDomain<PrestoColumnHandle>> optionalFilter =
+
Optional.ofNullable(prestoTableLayoutHandle.getTableHandle().getFilter());
+ optionalFilter.ifPresent(
+ filter ->
+ assertThat(
+
filter.getDomains().get().keySet().stream()
+ .allMatch(testcase ->
testcase.equals(testData)))
+ .isEqualTo(true));
+
+ // Assert whether there are any modifications after optimization
projectedColumns.
+ Optional<List<ColumnHandle>> projectedColumns =
+ prestoTableLayoutHandle.getTableHandle().getProjectedColumns();
+ projectedColumns.ifPresent(
+ columns ->
+ assertThat(columns.stream().allMatch(testcase ->
testcase.equals(testData)))
+ .isEqualTo(true));
+ }
+}