This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch mongo in repository https://gitbox.apache.org/repos/asf/drill.git
commit 40ff39dfb11d4f5bd21cf572c883c41726e49cb5 Author: Volodymyr Vysotskyi <[email protected]> AuthorDate: Thu Aug 5 19:06:00 2021 +0300 Add join rel, fix project --- .../exec/store/mongo/MongoAggregateUtils.java | 15 ++ .../drill/exec/store/mongo/MongoStoragePlugin.java | 5 +- .../store/mongo/plan/MongoPluginImplementor.java | 97 +++++++--- .../store/mongo/plan/MongoPluginRulesProvider.java | 76 -------- .../drill/exec/store/mongo/plan/MongoTable.java | 197 --------------------- .../store/mongo/plan/RexToMongoTranslator.java | 163 ++++++++++------- .../exec/store/mongo/TestMongoLimitPushDown.java | 8 +- .../exec/store/mongo/TestMongoProjectPushDown.java | 12 +- .../drill/exec/store/PlannableStoragePlugin.java | 14 +- .../drill/exec/store/PluginRulesProvider.java | 1 + .../drill/exec/store/PluginRulesProviderImpl.java | 97 ++++++++++ .../exec/store/plan/AbstractPluginImplementor.java | 113 ++++++++++++ .../drill/exec/store/plan/PluginImplementor.java | 27 ++- .../exec/store/plan/rel/PluginAggregateRel.java | 33 +--- .../drill/exec/store/plan/rel/PluginFilterRel.java | 5 + .../drill/exec/store/plan/rel/PluginJoinRel.java | 34 ++++ .../drill/exec/store/plan/rel/PluginLimitRel.java | 5 + .../exec/store/plan/rel/PluginProjectRel.java | 5 + .../drill/exec/store/plan/rel/PluginRel.java | 1 + .../drill/exec/store/plan/rel/PluginSortRel.java | 5 + .../drill/exec/store/plan/rel/PluginUnionRel.java | 5 + .../store/plan/rel/StoragePluginTableScan.java | 5 + .../exec/store/plan/rule/PluginAggregateRule.java | 31 ++-- .../exec/store/plan/rule/PluginConverterRule.java | 40 ++++- .../exec/store/plan/rule/PluginFilterRule.java | 9 +- .../rule/PluginIntermediatePrelConverterRule.java | 35 ++-- .../drill/exec/store/plan/rule/PluginJoinRule.java | 32 ++++ .../exec/store/plan/rule/PluginLimitRule.java | 16 +- .../exec/store/plan/rule/PluginProjectRule.java | 15 +- .../drill/exec/store/plan/rule/PluginSortRule.java | 17 +- .../exec/store/plan/rule/PluginUnionRule.java | 29 +-- 31 files changed, 675 insertions(+), 472 deletions(-) diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java index e362707..9f551f8 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java @@ -159,4 +159,19 @@ public class MongoAggregateUtils { } return null; } + + public static boolean supportsAggregation(AggregateCall aggregateCall) { + String name = aggregateCall.getAggregation().getName(); + return name.equals(SqlStdOperatorTable.COUNT.getName()) + || name.equals(SqlStdOperatorTable.SUM.getName()) + || name.equals(SqlStdOperatorTable.SUM0.getName()) + || name.equals(SqlStdOperatorTable.MIN.getName()) + || name.equals(SqlStdOperatorTable.MAX.getName()) + || name.equals(SqlStdOperatorTable.AVG.getName()) + || name.equals(SqlStdOperatorTable.FIRST.getName()) + || name.equals(SqlStdOperatorTable.LAST.getName()) + || name.equals(SqlStdOperatorTable.STDDEV.getName()) + || name.equals(SqlStdOperatorTable.STDDEV_SAMP.getName()) + || name.equals(SqlStdOperatorTable.STDDEV_POP.getName()); + } } diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java index 6262711..df0588d 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java @@ -36,7 +36,8 @@ import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.PlannableStoragePlugin; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.StoragePlugin; -import org.apache.drill.exec.store.mongo.plan.MongoPluginRulesProvider; +import org.apache.drill.exec.store.mongo.plan.MongoPluginImplementor; +import org.apache.drill.exec.store.PluginRulesProviderImpl; import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory; import org.apache.drill.exec.store.plan.rel.PluginRel; import org.apache.drill.exec.store.security.HadoopCredentialsProvider; @@ -82,7 +83,7 @@ public class MongoStoragePlugin extends PlannableStoragePlugin implements Storag private static MongoStoragePluginConfigs mongoStoragePluginBuilder(String name) { Convention convention = new Convention.Impl("MONGO." + name, PluginRel.class); return new MongoStoragePluginConfigs() - .rulesProvider(new MongoPluginRulesProvider(convention)) + .rulesProvider(new PluginRulesProviderImpl(convention, MongoPluginImplementor::new)) .supportsProjectPushdown(true) .supportsSortPushdown(true) .supportsAggregatePushdown(true) diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java index 5561047..8680c6d 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java @@ -4,15 +4,22 @@ import com.mongodb.client.model.Aggregates; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.core.Union; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.util.Pair; -import org.apache.calcite.util.Util; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.common.DrillLimitRelBase; import org.apache.drill.exec.planner.logical.DrillOptiq; import org.apache.drill.exec.planner.logical.DrillParseContext; import org.apache.drill.exec.planner.physical.PrelUtil; @@ -20,7 +27,7 @@ import org.apache.drill.exec.store.mongo.MongoAggregateUtils; import org.apache.drill.exec.store.mongo.MongoFilterBuilder; import org.apache.drill.exec.store.mongo.MongoGroupScan; import org.apache.drill.exec.store.mongo.MongoScanSpec; -import org.apache.drill.exec.store.plan.PluginImplementor; +import org.apache.drill.exec.store.plan.AbstractPluginImplementor; import org.apache.drill.exec.store.plan.rel.PluginAggregateRel; import org.apache.drill.exec.store.plan.rel.PluginFilterRel; import org.apache.drill.exec.store.plan.rel.PluginLimitRel; @@ -43,7 +50,7 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; -public class MongoPluginImplementor implements PluginImplementor { +public class MongoPluginImplementor extends AbstractPluginImplementor { private MongoGroupScan groupScan; private List<Bson> operations; private Document filters; @@ -88,11 +95,11 @@ public class MongoPluginImplementor implements PluginImplementor { if (limit.getOffset() != null) { operations.add( - Aggregates.skip(((BigDecimal) ((RexLiteral) limit.getOffset()).getValue()).intValue()).toBsonDocument()); + Aggregates.skip(rexLiteralIntValue((RexLiteral) limit.getOffset())).toBsonDocument()); } if (limit.getFetch() != null) { operations.add( - Aggregates.limit(((BigDecimal) ((RexLiteral) limit.getFetch()).getValue()).intValue()).toBsonDocument()); + Aggregates.limit(rexLiteralIntValue((RexLiteral) limit.getFetch())).toBsonDocument()); } } @@ -137,43 +144,34 @@ public class MongoPluginImplementor implements PluginImplementor { visitChild(sort.getInput()); if (!sort.collation.getFieldCollations().isEmpty()) { - final List<String> keys = new ArrayList<>(); - final List<RelDataTypeField> fields = sort.getRowType().getFieldList(); + BsonDocument sortKeys = new BsonDocument(); + List<RelDataTypeField> fields = sort.getRowType().getFieldList(); for (RelFieldCollation fieldCollation : sort.collation.getFieldCollations()) { - final String name = - fields.get(fieldCollation.getFieldIndex()).getName(); - keys.add(name + ": " + direction(fieldCollation)); - if (false) { - // TODO: NULLS FIRST and NULLS LAST - switch (fieldCollation.nullDirection) { - case FIRST: - break; - case LAST: - break; - default: - break; - } - } + String name = fields.get(fieldCollation.getFieldIndex()).getName(); + sortKeys.put(name, new BsonInt32(direction(fieldCollation))); } - operations.add( - Aggregates.sort(BsonDocument.parse(Util.toString(keys, "{", ", ", "}"))).toBsonDocument()); + operations.add(Aggregates.sort(sortKeys).toBsonDocument()); } if (sort.offset != null) { operations.add( - Aggregates.skip(((BigDecimal) ((RexLiteral) sort.offset).getValue()).intValue()).toBsonDocument()); + Aggregates.skip(rexLiteralIntValue((RexLiteral) sort.offset)).toBsonDocument()); } if (sort.fetch != null) { operations.add( - Aggregates.limit(((BigDecimal) ((RexLiteral) sort.fetch).getValue()).intValue()).toBsonDocument()); + Aggregates.limit(rexLiteralIntValue((RexLiteral) sort.fetch)).toBsonDocument()); } } + private int rexLiteralIntValue(RexLiteral offset) { + return ((BigDecimal) offset.getValue()).intValue(); + } + @Override public void implement(PluginUnionRel union) throws IOException { runAggregate = true; - MongoPluginImplementor childImplementor = copy(); + MongoPluginImplementor childImplementor = new MongoPluginImplementor(); childImplementor.runAggregate = true; boolean firstProcessed = false; @@ -199,8 +197,51 @@ public class MongoPluginImplementor implements PluginImplementor { } @Override - public MongoPluginImplementor copy() { - return new MongoPluginImplementor(); + public boolean canImplement(Aggregate aggregate) { + return aggregate.getGroupType() == Aggregate.Group.SIMPLE + && aggregate.getAggCallList().stream() + .noneMatch(AggregateCall::isDistinct) + && aggregate.getAggCallList().stream() + .allMatch(MongoAggregateUtils::supportsAggregation); + } + + @Override + public boolean canImplement(Filter filter) { + LogicalExpression conditionExp = DrillOptiq.toDrill( + new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())), + filter.getInput(), + filter.getCondition()); + MongoFilterBuilder filterBuilder = new MongoFilterBuilder(conditionExp); + filterBuilder.parseTree(); + return filterBuilder.isAllExpressionsConverted(); + } + + @Override + public boolean canImplement(DrillLimitRelBase limit) { + return true; + } + + @Override + public boolean canImplement(Project project) { + return project.getProjects().stream() + .allMatch(RexToMongoTranslator::supportsExpression); + } + + @Override + public boolean canImplement(Sort sort) { + return true; + } + + @Override + public boolean canImplement(Union union) { + // allow converting for union all only, since Drill adds extra aggregation for union distinct, + // so we will convert both union all and aggregation later + return union.all; + } + + @Override + public boolean canImplement(TableScan scan) { + return true; } @Override diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginRulesProvider.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginRulesProvider.java deleted file mode 100644 index 06319b6..0000000 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginRulesProvider.java +++ /dev/null @@ -1,76 +0,0 @@ -package org.apache.drill.exec.store.mongo.plan; - -import org.apache.calcite.plan.Convention; -import org.apache.calcite.plan.RelOptRule; -import org.apache.drill.exec.planner.logical.DrillRel; -import org.apache.drill.exec.store.PluginRulesProvider; -import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule; -import org.apache.drill.exec.store.plan.rule.PluginAggregateRule; -import org.apache.drill.exec.store.plan.rule.PluginFilterRule; -import org.apache.drill.exec.store.plan.rule.PluginIntermediatePrelConverterRule; -import org.apache.drill.exec.store.plan.rule.PluginLimitRule; -import org.apache.drill.exec.store.plan.rule.PluginProjectRule; -import org.apache.drill.exec.store.plan.rule.PluginSortRule; -import org.apache.drill.exec.store.plan.rule.PluginUnionRule; - -import java.util.Arrays; -import java.util.List; - -public class MongoPluginRulesProvider implements PluginRulesProvider { - private final Convention convention; - - public MongoPluginRulesProvider(Convention convention) { - this.convention = convention; - } - - public List<RelOptRule> sortRules() { - return Arrays.asList( - new PluginSortRule(Convention.NONE, convention), - new PluginSortRule(DrillRel.DRILL_LOGICAL, convention) - ); - } - - public List<RelOptRule> limitRules() { - return Arrays.asList( - new PluginLimitRule(Convention.NONE, convention), - new PluginLimitRule(DrillRel.DRILL_LOGICAL, convention) - ); - } - - public List<RelOptRule> filterRules() { - return Arrays.asList( - new PluginFilterRule(Convention.NONE, convention), - new PluginFilterRule(DrillRel.DRILL_LOGICAL, convention) - ); - } - - public List<RelOptRule> projectRules() { - return Arrays.asList( - new PluginProjectRule(Convention.NONE, convention), - new PluginProjectRule(DrillRel.DRILL_LOGICAL, convention) - ); - } - - public List<RelOptRule> aggregateRules() { - return Arrays.asList( - new PluginAggregateRule(Convention.NONE, convention), - new PluginAggregateRule(DrillRel.DRILL_LOGICAL, convention) - ); - } - - public List<RelOptRule> unionRules() { - return Arrays.asList( - new PluginUnionRule(Convention.NONE, convention), - new PluginUnionRule(DrillRel.DRILL_LOGICAL, convention) - ); - } - - public RelOptRule vertexRule() { - return new VertexDrelConverterRule(convention); - } - - @Override - public RelOptRule prelConverterRule() { - return new PluginIntermediatePrelConverterRule(MongoPluginImplementor::new); - } -} diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoTable.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoTable.java deleted file mode 100644 index 68e01e9..0000000 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoTable.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.mongo.plan; - -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoDatabase; -import org.apache.calcite.adapter.java.AbstractQueryableTable; -import org.apache.calcite.linq4j.Enumerable; -import org.apache.calcite.linq4j.Enumerator; -import org.apache.calcite.linq4j.QueryProvider; -import org.apache.calcite.linq4j.Queryable; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.TranslatableTable; -import org.apache.calcite.schema.impl.AbstractTableQueryable; -import org.apache.calcite.sql.type.SqlTypeName; -import org.bson.BsonDocument; -import org.bson.conversions.Bson; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * Table based on a MongoDB collection. - */ -public class MongoTable extends AbstractQueryableTable - implements TranslatableTable { - private final String collectionName; - - /** Creates a MongoTable. */ - MongoTable(String collectionName) { - super(Object[].class); - this.collectionName = collectionName; - } - - @Override public String toString() { - return "MongoTable {" + collectionName + "}"; - } - - @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { - final RelDataType mapType = - typeFactory.createMapType( - typeFactory.createSqlType(SqlTypeName.VARCHAR), - typeFactory.createTypeWithNullability( - typeFactory.createSqlType(SqlTypeName.ANY), true)); - return typeFactory.builder().add("_MAP", mapType).build(); - } - - @Override public <T> Queryable<T> asQueryable(QueryProvider queryProvider, - SchemaPlus schema, String tableName) { - return new MongoQueryable<>(queryProvider, schema, this, tableName); - } - - @Override public RelNode toRel( - RelOptTable.ToRelContext context, - RelOptTable relOptTable) { - final RelOptCluster cluster = context.getCluster(); - // cluster, traits, grpScan, table -// return new MongoTableScan(cluster, cluster.traitSetOf(MongoRel.CONVENTION), -// relOptTable, relOptTable.getRowType(), null); - return null; - } - - /** Executes a "find" operation on the underlying collection. - * - * <p>For example, - * <code>zipsTable.find("{state: 'OR'}", "{city: 1, zipcode: 1}")</code></p> - * - * @param mongoDb MongoDB connection - * @param filterJson Filter JSON string, or null - * @param projectJson Project JSON string, or null - * @param fields List of fields to project; or null to return map - * @return Enumerator of results - */ - private Enumerable<Object> find(MongoDatabase mongoDb, String filterJson, - String projectJson, List<Map.Entry<String, Class>> fields) { - final MongoCollection collection = - mongoDb.getCollection(collectionName); - final Bson filter = - filterJson == null ? null : BsonDocument.parse(filterJson); - final Bson project = - projectJson == null ? null : BsonDocument.parse(projectJson); -// final Function1<Document, Object> getter = MongoEnumerator.getter(fields); -// return new AbstractEnumerable<Object>() { -// @Override public Enumerator<Object> enumerator() { -// @SuppressWarnings("unchecked") final FindIterable<Document> cursor = -// collection.find(filter).projection(project); -// return new MongoEnumerator(cursor.iterator(), getter); -// } -// }; - return null; - } - - /** Executes an "aggregate" operation on the underlying collection. - * - * <p>For example: - * <code>zipsTable.aggregate( - * "{$filter: {state: 'OR'}", - * "{$group: {_id: '$city', c: {$sum: 1}, p: {$sum: '$pop'}}}") - * </code></p> - * - * @param mongoDb MongoDB connection - * @param fields List of fields to project; or null to return map - * @param operations One or more JSON strings - * @return Enumerator of results - */ - private Enumerable<Object> aggregate(final MongoDatabase mongoDb, - List<Map.Entry<String, Class>> fields, - List<String> operations) { - List<Bson> list = new ArrayList<>(); - for (String operation : operations) { - list.add(BsonDocument.parse(operation)); - } -// Function1<Document, Object> getter = -// MongoEnumerator.getter(fields); -// return new AbstractEnumerable<Object>() { -// @Override public Enumerator<Object> enumerator() { -// final Iterator<Document> resultIterator; -// try { -// resultIterator = mongoDb.getCollection(collectionName) -// .aggregate(list).iterator(); -// } catch (Exception e) { -// throw new RuntimeException("While running MongoDB query " -// + Util.toString(operations, "[", ",\n", "]"), e); -// } -// return new MongoEnumerator(resultIterator, getter); -// } -// }; - return null; - } - - /** Implementation of {@link Queryable} based on - * a {@link MongoTable}. - * - * @param <T> element type */ - public static class MongoQueryable<T> extends AbstractTableQueryable<T> { - MongoQueryable(QueryProvider queryProvider, SchemaPlus schema, - MongoTable table, String tableName) { - super(queryProvider, schema, table, tableName); - } - - @Override public Enumerator<T> enumerator() { - //noinspection unchecked - final Enumerable<T> enumerable = - (Enumerable<T>) getTable().find(getMongoDb(), null, null, null); - return enumerable.enumerator(); - } - - private MongoDatabase getMongoDb() { -// return schema.unwrap(MongoSchemaFactory.MongoSchema.class).mongoDb; - return null; - } - - private MongoTable getTable() { - return (MongoTable) table; - } - - @SuppressWarnings("UnusedDeclaration") - public Enumerable<Object> aggregate(List<Map.Entry<String, Class>> fields, - List<String> operations) { - return getTable().aggregate(getMongoDb(), fields, operations); - } - - /** Called via code-generation. - * - * @param filterJson Filter document - * @param projectJson Projection document - * @param fields List of expected fields (and their types) - * @return result of mongo query - * - */ - @SuppressWarnings("UnusedDeclaration") - public Enumerable<Object> find(String filterJson, - String projectJson, List<Map.Entry<String, Class>> fields) { - return getTable().find(getMongoDb(), filterJson, projectJson, fields); - } - } -} diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/RexToMongoTranslator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/RexToMongoTranslator.java index 478591f..8320fa9 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/RexToMongoTranslator.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/RexToMongoTranslator.java @@ -1,5 +1,6 @@ package org.apache.drill.exec.store.mongo.plan; +import com.google.common.collect.ImmutableMap; import org.apache.calcite.adapter.enumerable.RexImpTable; import org.apache.calcite.adapter.enumerable.RexToLixTranslator; import org.apache.calcite.adapter.java.JavaTypeFactory; @@ -21,7 +22,6 @@ import org.bson.BsonString; import org.bson.BsonValue; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -35,59 +35,57 @@ class RexToMongoTranslator extends RexVisitorImpl<BsonValue> { private final List<String> inFields; - private static final Map<SqlOperator, String> MONGO_OPERATORS = - new HashMap<>(); - - static { - MONGO_OPERATORS.put(SqlStdOperatorTable.DIVIDE, "$divide"); - MONGO_OPERATORS.put(SqlStdOperatorTable.MULTIPLY, "$multiply"); - MONGO_OPERATORS.put(SqlStdOperatorTable.ABS, "$abs"); - MONGO_OPERATORS.put(SqlStdOperatorTable.ACOS, "$acos"); - MONGO_OPERATORS.put(SqlStdOperatorTable.ASIN, "$asin"); - MONGO_OPERATORS.put(SqlStdOperatorTable.ATAN, "$atan"); - MONGO_OPERATORS.put(SqlStdOperatorTable.ATAN2, "$atan2"); - MONGO_OPERATORS.put(SqlStdOperatorTable.CEIL, "$ceil"); - MONGO_OPERATORS.put(SqlStdOperatorTable.CONCAT, "$concat"); - MONGO_OPERATORS.put(SqlStdOperatorTable.COS, "$cos"); - MONGO_OPERATORS.put(SqlStdOperatorTable.DAYOFMONTH, "$dayOfMonth"); - MONGO_OPERATORS.put(SqlStdOperatorTable.WEEK, "$isoWeek"); - MONGO_OPERATORS.put(SqlStdOperatorTable.YEAR, "$isoWeekYear"); - MONGO_OPERATORS.put(SqlStdOperatorTable.DAYOFWEEK, "$isoDayOfWeek"); - MONGO_OPERATORS.put(SqlStdOperatorTable.DAYOFYEAR, "$dayOfYear"); - MONGO_OPERATORS.put(SqlStdOperatorTable.RADIANS, "$degreesToRadians"); - MONGO_OPERATORS.put(SqlStdOperatorTable.DENSE_RANK, "$denseRank"); - MONGO_OPERATORS.put(SqlStdOperatorTable.EXP, "$exp"); - MONGO_OPERATORS.put(SqlStdOperatorTable.FLOOR, "$floor"); - MONGO_OPERATORS.put(SqlStdOperatorTable.HOUR, "$hour"); - MONGO_OPERATORS.put(SqlStdOperatorTable.LN, "$ln"); - MONGO_OPERATORS.put(SqlStdOperatorTable.LOG10, "$log10"); - MONGO_OPERATORS.put(SqlStdOperatorTable.MINUTE, "$minute"); - MONGO_OPERATORS.put(SqlStdOperatorTable.MOD, "$mod"); - MONGO_OPERATORS.put(SqlStdOperatorTable.MONTH, "$month"); - MONGO_OPERATORS.put(SqlStdOperatorTable.POWER, "$pow"); - MONGO_OPERATORS.put(SqlStdOperatorTable.DEGREES, "$radiansToDegrees"); - MONGO_OPERATORS.put(SqlStdOperatorTable.RAND, "$rand"); - MONGO_OPERATORS.put(SqlStdOperatorTable.REPLACE, "$replaceAll"); - MONGO_OPERATORS.put(SqlStdOperatorTable.ROUND, "$round"); - MONGO_OPERATORS.put(SqlStdOperatorTable.SECOND, "$second"); - MONGO_OPERATORS.put(SqlStdOperatorTable.SIN, "$sin"); - MONGO_OPERATORS.put(SqlStdOperatorTable.SQRT, "$sqrt"); - MONGO_OPERATORS.put(SqlStdOperatorTable.SUBSTRING, "$substr"); - MONGO_OPERATORS.put(SqlStdOperatorTable.PLUS, "$add"); - MONGO_OPERATORS.put(SqlStdOperatorTable.MINUS, "$subtract"); - MONGO_OPERATORS.put(SqlStdOperatorTable.TAN, "$tan"); - MONGO_OPERATORS.put(SqlStdOperatorTable.TRIM, "trim"); - MONGO_OPERATORS.put(SqlStdOperatorTable.TRUNCATE, "$trunc"); - MONGO_OPERATORS.put(SqlStdOperatorTable.AND, MongoOp.AND.getCompareOp()); - MONGO_OPERATORS.put(SqlStdOperatorTable.OR, MongoOp.OR.getCompareOp()); - MONGO_OPERATORS.put(SqlStdOperatorTable.NOT, MongoOp.NOT.getCompareOp()); - MONGO_OPERATORS.put(SqlStdOperatorTable.EQUALS, MongoOp.EQUAL.getCompareOp()); - MONGO_OPERATORS.put(SqlStdOperatorTable.NOT_EQUALS, MongoOp.NOT_EQUAL.getCompareOp()); - MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN, MongoOp.GREATER.getCompareOp()); - MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, MongoOp.GREATER_OR_EQUAL.getCompareOp()); - MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN, MongoOp.LESS.getCompareOp()); - MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, MongoOp.LESS_OR_EQUAL.getCompareOp()); - } + private static final Map<SqlOperator, String> MONGO_OPERATORS = ImmutableMap.<SqlOperator, String>builder() + .put(SqlStdOperatorTable.DIVIDE, "$divide") + .put(SqlStdOperatorTable.MULTIPLY, "$multiply") + .put(SqlStdOperatorTable.ABS, "$abs") + .put(SqlStdOperatorTable.ACOS, "$acos") + .put(SqlStdOperatorTable.ASIN, "$asin") + .put(SqlStdOperatorTable.ATAN, "$atan") + .put(SqlStdOperatorTable.ATAN2, "$atan2") + .put(SqlStdOperatorTable.CEIL, "$ceil") + .put(SqlStdOperatorTable.CONCAT, "$concat") + .put(SqlStdOperatorTable.COS, "$cos") + .put(SqlStdOperatorTable.DAYOFMONTH, "$dayOfMonth") + .put(SqlStdOperatorTable.WEEK, "$isoWeek") + .put(SqlStdOperatorTable.YEAR, "$isoWeekYear") + .put(SqlStdOperatorTable.DAYOFWEEK, "$isoDayOfWeek") + .put(SqlStdOperatorTable.DAYOFYEAR, "$dayOfYear") + .put(SqlStdOperatorTable.RADIANS, "$degreesToRadians") + .put(SqlStdOperatorTable.DENSE_RANK, "$denseRank") + .put(SqlStdOperatorTable.EXP, "$exp") + .put(SqlStdOperatorTable.FLOOR, "$floor") + .put(SqlStdOperatorTable.HOUR, "$hour") + .put(SqlStdOperatorTable.LN, "$ln") + .put(SqlStdOperatorTable.LOG10, "$log10") + .put(SqlStdOperatorTable.MINUTE, "$minute") + .put(SqlStdOperatorTable.MOD, "$mod") + .put(SqlStdOperatorTable.MONTH, "$month") + .put(SqlStdOperatorTable.POWER, "$pow") + .put(SqlStdOperatorTable.DEGREES, "$radiansToDegrees") + .put(SqlStdOperatorTable.RAND, "$rand") + .put(SqlStdOperatorTable.REPLACE, "$replaceAll") + .put(SqlStdOperatorTable.ROUND, "$round") + .put(SqlStdOperatorTable.SECOND, "$second") + .put(SqlStdOperatorTable.SIN, "$sin") + .put(SqlStdOperatorTable.SQRT, "$sqrt") + .put(SqlStdOperatorTable.SUBSTRING, "$substr") + .put(SqlStdOperatorTable.PLUS, "$add") + .put(SqlStdOperatorTable.MINUS, "$subtract") + .put(SqlStdOperatorTable.TAN, "$tan") + .put(SqlStdOperatorTable.TRIM, "trim") + .put(SqlStdOperatorTable.TRUNCATE, "$trunc") + .put(SqlStdOperatorTable.AND, MongoOp.AND.getCompareOp()) + .put(SqlStdOperatorTable.OR, MongoOp.OR.getCompareOp()) + .put(SqlStdOperatorTable.NOT, MongoOp.NOT.getCompareOp()) + .put(SqlStdOperatorTable.EQUALS, MongoOp.EQUAL.getCompareOp()) + .put(SqlStdOperatorTable.NOT_EQUALS, MongoOp.NOT_EQUAL.getCompareOp()) + .put(SqlStdOperatorTable.GREATER_THAN, MongoOp.GREATER.getCompareOp()) + .put(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, MongoOp.GREATER_OR_EQUAL.getCompareOp()) + .put(SqlStdOperatorTable.LESS_THAN, MongoOp.LESS.getCompareOp()) + .put(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, MongoOp.LESS_OR_EQUAL.getCompareOp()) + .build(); + protected RexToMongoTranslator(JavaTypeFactory typeFactory, List<String> inFields) { @@ -129,11 +127,14 @@ class RexToMongoTranslator extends RexVisitorImpl<BsonValue> { return new BsonDocument(stdOperator, new BsonArray(strings)); } if (call.getOperator() == SqlStdOperatorTable.ITEM) { - final RexNode op1 = call.operands.get(1); - if (op1 instanceof RexLiteral - && op1.getType().getSqlTypeName() == SqlTypeName.INTEGER) { - return new BsonDocument("$arrayElemAt", new BsonArray( - Arrays.asList(strings.get(0), new BsonInt32(((RexLiteral) op1).getValueAs(Integer.class))))); + RexNode op1 = call.operands.get(1); + if (op1 instanceof RexLiteral) { + if (op1.getType().getSqlTypeName() == SqlTypeName.INTEGER) { + return new BsonDocument("$arrayElemAt", new BsonArray( + Arrays.asList(strings.get(0), new BsonInt32(((RexLiteral) op1).getValueAs(Integer.class))))); + } else if (op1.getType().getSqlTypeName() == SqlTypeName.CHAR) { + return new BsonString(strings.get(0).asString().getValue() + "." + ((RexLiteral) op1).getValueAs(String.class)); + } } } if (call.getOperator() == SqlStdOperatorTable.CASE) { @@ -171,8 +172,8 @@ class RexToMongoTranslator extends RexVisitorImpl<BsonValue> { if (call.getOperator() != SqlStdOperatorTable.ITEM) { return null; } - final RexNode op0 = call.operands.get(0); - final RexNode op1 = call.operands.get(1); + RexNode op0 = call.operands.get(0); + RexNode op1 = call.operands.get(1); if (op0 instanceof RexInputRef && ((RexInputRef) op0).getIndex() == 0 && op1 instanceof RexLiteral @@ -181,4 +182,44 @@ class RexToMongoTranslator extends RexVisitorImpl<BsonValue> { } return null; } + + public static boolean supportsExpression(RexNode expr) { + return expr.accept(new RexMongoChecker()); + } + + private static class RexMongoChecker extends RexVisitorImpl<Boolean> { + + protected RexMongoChecker() { + super(true); + } + + @Override + public Boolean visitLiteral(RexLiteral literal) { + return true; + } + + @Override + public Boolean visitInputRef(RexInputRef inputRef) { + return true; + } + + @Override + public Boolean visitCall(RexCall call) { + if (isItem(call) != null + || call.getKind() == SqlKind.CAST + || call.getOperator() == SqlStdOperatorTable.CASE + || MONGO_OPERATORS.get(call.getOperator()) != null) { + return true; + } + + if (call.getOperator() == SqlStdOperatorTable.ITEM) { + RexNode op = call.operands.get(1); + return op instanceof RexLiteral + && (op.getType().getSqlTypeName() == SqlTypeName.INTEGER + || op.getType().getSqlTypeName() == SqlTypeName.CHAR); + } + + return false; + } + } } diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java index bb660a0..2344df7 100644 --- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java +++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java @@ -33,7 +33,7 @@ public class TestMongoLimitPushDown extends MongoTestBase { .sql(sql) .planMatcher() .exclude("Limit\\(") - .include("MongoGroupScan.*limit=4") + .include("MongoGroupScan.*\"\\$limit\": 4") .match(); } @@ -44,7 +44,7 @@ public class TestMongoLimitPushDown extends MongoTestBase { .sql(sql) .planMatcher() .exclude("Limit") - .include("sort=\\{employee_id", "limit=4") + .include("MongoGroupScan.*\"\\$sort\": \\{\"employee_id\": 1}", "\"\\$limit\": 4") .match(); } @@ -55,7 +55,7 @@ public class TestMongoLimitPushDown extends MongoTestBase { .sql(sql) .planMatcher() .exclude("Limit") - .include("skip=5", "limit=4") + .include("\"\\$skip\": 5", "\"\\$limit\": 4") .match(); } @@ -66,7 +66,7 @@ public class TestMongoLimitPushDown extends MongoTestBase { .sql(sql) .planMatcher() .exclude("Limit") - .include("limit=4", "eq=52.17") + .include("\"\\$limit\": 4", "\"\\$eq\": 52\\.17") .match(); } } diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java index 48d13ce..a691443 100644 --- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java +++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java @@ -23,21 +23,21 @@ import static org.apache.drill.test.TestBuilder.mapOf; import org.apache.drill.categories.MongoStorageTest; import org.apache.drill.categories.SlowTest; import org.apache.drill.exec.ExecConstants; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -@Ignore("DRILL-3775") @Category({SlowTest.class, MongoStorageTest.class}) public class TestMongoProjectPushDown extends MongoTestBase { - /** - * - * @throws Exception - */ @Test public void testComplexProjectPushdown() throws Exception { + queryBuilder() + .sql("select t.field_4.inner_3 as col_1, t.field_4 as col_2 from mongo.employee.schema_change t") + .planMatcher() + .include("MongoGroupScan.*\"\\$project\": \\{\"col_1\": \"\\$field_4.inner_3\", \"col_2\": \"\\$field_4\"\\}") + .match(); + try { testBuilder() .sqlQuery("select t.field_4.inner_3 as col_1, t.field_4 as col_2 from mongo.employee.schema_change t") diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PlannableStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PlannableStoragePlugin.java index c9d4b8c..7219921 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PlannableStoragePlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PlannableStoragePlugin.java @@ -59,6 +59,9 @@ public abstract class PlannableStoragePlugin extends AbstractStoragePlugin { if (plannableStoragePluginConfigs.supportsUnionPushdown) { builder.addAll(rulesProvider.unionRules()); } + if (plannableStoragePluginConfigs.supportsJoinPushdown) { + builder.addAll(rulesProvider.joinRules()); + } if (plannableStoragePluginConfigs.supportsAggregatePushdown) { builder.addAll(rulesProvider.aggregateRules()); } @@ -89,6 +92,7 @@ public abstract class PlannableStoragePlugin extends AbstractStoragePlugin { private boolean supportsAggregatePushdown; private boolean supportsSortPushdown; private boolean supportsUnionPushdown; + private boolean supportsJoinPushdown; private boolean supportsLimitPushdown; private PluginRulesProvider rulesProvider; private Convention convention; @@ -158,6 +162,15 @@ public abstract class PlannableStoragePlugin extends AbstractStoragePlugin { return self(); } + public boolean supportsJoinPushdown() { + return supportsJoinPushdown; + } + + public T supportsJoinPushdown(boolean supportsJoinPushdown) { + this.supportsJoinPushdown = supportsJoinPushdown; + return self(); + } + public boolean supportsLimitPushdown() { return supportsLimitPushdown; } @@ -185,5 +198,4 @@ public abstract class PlannableStoragePlugin extends AbstractStoragePlugin { return this; } } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java index d35bf83..ab566ba 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java @@ -11,6 +11,7 @@ public interface PluginRulesProvider { List<RelOptRule> projectRules(); List<RelOptRule> aggregateRules(); List<RelOptRule> unionRules(); + List<RelOptRule> joinRules(); RelOptRule vertexRule(); RelOptRule prelConverterRule(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProviderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProviderImpl.java new file mode 100644 index 0000000..9337a42 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProviderImpl.java @@ -0,0 +1,97 @@ +package org.apache.drill.exec.store; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRule; +import org.apache.drill.exec.planner.logical.DrillRel; +import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule; +import org.apache.drill.exec.store.plan.PluginImplementor; +import org.apache.drill.exec.store.plan.rule.PluginAggregateRule; +import org.apache.drill.exec.store.plan.rule.PluginFilterRule; +import org.apache.drill.exec.store.plan.rule.PluginIntermediatePrelConverterRule; +import org.apache.drill.exec.store.plan.rule.PluginJoinRule; +import org.apache.drill.exec.store.plan.rule.PluginLimitRule; +import org.apache.drill.exec.store.plan.rule.PluginProjectRule; +import org.apache.drill.exec.store.plan.rule.PluginSortRule; +import org.apache.drill.exec.store.plan.rule.PluginUnionRule; + +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; + +public class PluginRulesProviderImpl implements PluginRulesProvider { + private final Supplier<PluginImplementor> implementorSupplier; + private final PluginImplementor pluginImplementor; + private final Convention convention; + + public PluginRulesProviderImpl(Convention convention, Supplier<PluginImplementor> implementorSupplier) { + this.convention = convention; + this.implementorSupplier = implementorSupplier; + this.pluginImplementor = implementorSupplier.get(); + } + + @Override + public List<RelOptRule> sortRules() { + return Arrays.asList( + new PluginSortRule(Convention.NONE, convention, pluginImplementor), + new PluginSortRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor) + ); + } + + @Override + public List<RelOptRule> limitRules() { + return Arrays.asList( + new PluginLimitRule(Convention.NONE, convention, pluginImplementor), + new PluginLimitRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor) + ); + } + + @Override + public List<RelOptRule> filterRules() { + return Arrays.asList( + new PluginFilterRule(Convention.NONE, convention, pluginImplementor), + new PluginFilterRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor) + ); + } + + @Override + public List<RelOptRule> projectRules() { + return Arrays.asList( + new PluginProjectRule(Convention.NONE, convention, pluginImplementor), + new PluginProjectRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor) + ); + } + + @Override + public List<RelOptRule> aggregateRules() { + return Arrays.asList( + new PluginAggregateRule(Convention.NONE, convention, pluginImplementor), + new PluginAggregateRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor) + ); + } + + @Override + public List<RelOptRule> unionRules() { + return Arrays.asList( + new PluginUnionRule(Convention.NONE, convention, pluginImplementor), + new PluginUnionRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor) + ); + } + + @Override + public List<RelOptRule> joinRules() { + return Arrays.asList( + new PluginJoinRule(Convention.NONE, convention, pluginImplementor), + new PluginJoinRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor) + ); + } + + @Override + public RelOptRule vertexRule() { + return new VertexDrelConverterRule(convention); + } + + @Override + public RelOptRule prelConverterRule() { + return new PluginIntermediatePrelConverterRule(convention, implementorSupplier); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java new file mode 100644 index 0000000..1d6c744 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java @@ -0,0 +1,113 @@ +package org.apache.drill.exec.store.plan; + +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.core.Union; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.planner.common.DrillLimitRelBase; +import org.apache.drill.exec.store.plan.rel.PluginAggregateRel; +import org.apache.drill.exec.store.plan.rel.PluginFilterRel; +import org.apache.drill.exec.store.plan.rel.PluginJoinRel; +import org.apache.drill.exec.store.plan.rel.PluginLimitRel; +import org.apache.drill.exec.store.plan.rel.PluginProjectRel; +import org.apache.drill.exec.store.plan.rel.PluginSortRel; +import org.apache.drill.exec.store.plan.rel.PluginUnionRel; +import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public abstract class AbstractPluginImplementor implements PluginImplementor { + private static final Logger logger = LoggerFactory.getLogger(AbstractPluginImplementor.class); + + @Override + public void implement(PluginAggregateRel aggregate) throws IOException { + throw getUnsupported("aggregate"); + } + + @Override + public void implement(PluginFilterRel filter) throws IOException { + throw getUnsupported("filter"); + } + + @Override + public void implement(PluginLimitRel limit) throws IOException { + throw getUnsupported("limit"); + } + + @Override + public void implement(PluginProjectRel project) throws IOException { + throw getUnsupported("project"); + } + + @Override + public void implement(PluginSortRel sort) throws IOException { + throw getUnsupported("sort"); + } + + @Override + public void implement(PluginUnionRel union) throws IOException { + throw getUnsupported("union"); + } + + @Override + public void implement(PluginJoinRel join) throws IOException { + throw getUnsupported("join"); + } + + @Override + public void implement(StoragePluginTableScan scan) throws IOException { + throw getUnsupported("scan"); + } + + @Override + public boolean canImplement(Aggregate aggregate) { + return false; + } + + @Override + public boolean canImplement(Filter filter) { + return false; + } + + @Override + public boolean canImplement(DrillLimitRelBase limit) { + return false; + } + + @Override + public boolean canImplement(Project project) { + return false; + } + + @Override + public boolean canImplement(Sort sort) { + return false; + } + + @Override + public boolean canImplement(Union union) { + return false; + } + + @Override + public boolean canImplement(TableScan scan) { + return false; + } + + @Override + public boolean canImplement(Join scan) { + return false; + } + + private UserException getUnsupported(String rel) { + return UserException.unsupportedError() + .message("Plugin implementor doesn't support push down for %", rel) + .build(logger); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java index 31f388d..77013e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java @@ -1,9 +1,18 @@ package org.apache.drill.exec.store.plan; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.core.Union; import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.common.DrillLimitRelBase; import org.apache.drill.exec.store.plan.rel.PluginAggregateRel; import org.apache.drill.exec.store.plan.rel.PluginFilterRel; +import org.apache.drill.exec.store.plan.rel.PluginJoinRel; import org.apache.drill.exec.store.plan.rel.PluginLimitRel; import org.apache.drill.exec.store.plan.rel.PluginProjectRel; import org.apache.drill.exec.store.plan.rel.PluginRel; @@ -27,9 +36,25 @@ public interface PluginImplementor { void implement(PluginUnionRel union) throws IOException; + void implement(PluginJoinRel join) throws IOException; + void implement(StoragePluginTableScan scan) throws IOException; - PluginImplementor copy(); + boolean canImplement(Aggregate aggregate); + + boolean canImplement(Filter filter); + + boolean canImplement(DrillLimitRelBase limit); + + boolean canImplement(Project project); + + boolean canImplement(Sort sort); + + boolean canImplement(Union union); + + boolean canImplement(Join scan); + + boolean canImplement(TableScan scan); GroupScan getPhysicalOperator() throws IOException; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java index 8505ac7..98b8ac1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java @@ -20,7 +20,6 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.InvalidRelException; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; @@ -40,38 +39,17 @@ public class PluginAggregateRel extends DrillAggregateRelBase implements PluginR RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, - List<AggregateCall> aggCalls) - throws InvalidRelException { + List<AggregateCall> aggCalls) { super(cluster, traitSet, input, groupSet, groupSets, aggCalls); assert getConvention() == input.getConvention(); - - for (AggregateCall aggCall : aggCalls) { - if (aggCall.isDistinct()) { - throw new InvalidRelException( - "distinct aggregation not supported"); - } - } - switch (getGroupType()) { - case SIMPLE: - break; - default: - throw new InvalidRelException("unsupported group type: " - + getGroupType()); - } } @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) { - try { - return new PluginAggregateRel(getCluster(), traitSet, input, - groupSet, groupSets, aggCalls); - } catch (InvalidRelException e) { - // Semantic error not possible. Must be a bug. Convert to - // internal error. - throw new AssertionError(e); - } + return new PluginAggregateRel(getCluster(), traitSet, input, + groupSet, groupSets, aggCalls); } @Override @@ -84,4 +62,9 @@ public class PluginAggregateRel extends DrillAggregateRelBase implements PluginR public void implement(PluginImplementor implementor) throws IOException { implementor.implement(this); } + + @Override + public boolean canImplement(PluginImplementor implementor) { + return implementor.canImplement(this); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java index 661ce21..d36ace7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java @@ -56,4 +56,9 @@ public class PluginFilterRel extends DrillFilterRelBase implements PluginRel { public void implement(PluginImplementor implementor) throws IOException { implementor.implement(this); } + + @Override + public boolean canImplement(PluginImplementor implementor) { + return implementor.canImplement(this); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginJoinRel.java new file mode 100644 index 0000000..aaad170 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginJoinRel.java @@ -0,0 +1,34 @@ +package org.apache.drill.exec.store.plan.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.exec.planner.common.DrillJoinRelBase; +import org.apache.drill.exec.store.plan.PluginImplementor; + +import java.io.IOException; + +public class PluginJoinRel extends DrillJoinRelBase implements PluginRel { + + public PluginJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, JoinRelType joinType) { + super(cluster, traits, left, right, condition, joinType); + } + + @Override + public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { + return new PluginJoinRel(getCluster(), traitSet, left, right, conditionExpr, joinType); + } + + @Override + public void implement(PluginImplementor implementor) throws IOException { + implementor.implement(this); + } + + @Override + public boolean canImplement(PluginImplementor implementor) { + return false; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java index b74aac4..4029a1d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java @@ -51,4 +51,9 @@ public class PluginLimitRel extends DrillLimitRelBase implements PluginRel { public void implement(PluginImplementor implementor) throws IOException { implementor.implement(this); } + + @Override + public boolean canImplement(PluginImplementor implementor) { + return implementor.canImplement(this); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java index 49a2cac..1168aaa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java @@ -56,4 +56,9 @@ public class PluginProjectRel extends DrillProjectRelBase implements PluginRel { public void implement(PluginImplementor implementor) throws IOException { implementor.implement(this); } + + @Override + public boolean canImplement(PluginImplementor implementor) { + return implementor.canImplement(this); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java index 7364e70..d6e1c47 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java @@ -8,4 +8,5 @@ import java.io.IOException; public interface PluginRel extends RelNode { void implement(PluginImplementor implementor) throws IOException; + boolean canImplement(PluginImplementor implementor); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java index 255bb2d..ac937b8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java @@ -58,4 +58,9 @@ public class PluginSortRel extends DrillSortRelBase implements PluginRel { public boolean canBeDropped() { return false; } + + @Override + public boolean canImplement(PluginImplementor implementor) { + return implementor.canImplement(this); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java index 73227bd..5f75cdc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java @@ -39,4 +39,9 @@ public class PluginUnionRel extends DrillUnionRelBase implements PluginRel { public void implement(PluginImplementor implementor) throws IOException { implementor.implement(this); } + + @Override + public boolean canImplement(PluginImplementor implementor) { + return implementor.canImplement(this); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java index a062b2e..fbfb085 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java @@ -71,4 +71,9 @@ public class StoragePluginTableScan extends DrillScanRelBase implements PluginRe protected String computeDigest() { return super.computeDigest(); } + + @Override + public boolean canImplement(PluginImplementor implementor) { + return implementor.canImplement(this); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java index 616493d..111787f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java @@ -2,37 +2,26 @@ package org.apache.drill.exec.store.plan.rule; import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelTrait; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.InvalidRelException; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; +import org.apache.drill.exec.store.plan.PluginImplementor; import org.apache.drill.exec.store.plan.rel.PluginAggregateRel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class PluginAggregateRule extends PluginConverterRule { - private static final Logger logger = LoggerFactory.getLogger(PluginAggregateRule.class); - public PluginAggregateRule(RelTrait in, Convention out) { - super(Aggregate.class, in, out, "PluginAggregateRule"); + public PluginAggregateRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) { + super(Aggregate.class, in, out, "PluginAggregateRule", pluginImplementor); } @Override public RelNode convert(RelNode rel) { Aggregate agg = (Aggregate) rel; - RelTraitSet traitSet = - agg.getTraitSet().replace(getOutConvention()); - try { - return new PluginAggregateRel( - rel.getCluster(), - traitSet, - convert(agg.getInput(), traitSet.simplify()), - agg.getGroupSet(), - agg.getGroupSets(), - agg.getAggCallList()); - } catch (InvalidRelException e) { - logger.warn(e.toString()); - return null; - } + return new PluginAggregateRel( + rel.getCluster(), + agg.getTraitSet().replace(getOutConvention()), + convert(agg.getInput(), agg.getTraitSet().replace(getOutConvention()).simplify()), + agg.getGroupSet(), + agg.getGroupSets(), + agg.getAggCallList()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java index 5801d83..15d6baf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java @@ -1,16 +1,54 @@ package org.apache.drill.exec.store.plan.rule; import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelTrait; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.core.Union; +import org.apache.drill.exec.planner.common.DrillLimitRelBase; import org.apache.drill.exec.planner.logical.DrillRelFactories; +import org.apache.drill.exec.store.plan.PluginImplementor; import java.util.function.Predicate; public abstract class PluginConverterRule extends ConverterRule { + private final PluginImplementor pluginImplementor; - protected PluginConverterRule(Class<? extends RelNode> clazz, RelTrait in, Convention out, String description) { + protected PluginConverterRule(Class<? extends RelNode> clazz, + RelTrait in, Convention out, String description, PluginImplementor pluginImplementor) { super(clazz, (Predicate<RelNode>) input -> true, in, out, DrillRelFactories.LOGICAL_BUILDER, description); + this.pluginImplementor = pluginImplementor; + } + + public PluginImplementor getPluginImplementor() { + return pluginImplementor; + } + + @Override + public boolean matches(RelOptRuleCall call) { + RelNode rel = call.rel(0); + boolean canImplement = false; + if (rel instanceof Aggregate) { + canImplement = pluginImplementor.canImplement(((Aggregate) rel)); + } else if (rel instanceof Filter) { + canImplement = pluginImplementor.canImplement(((Filter) rel)); + } else if (rel instanceof DrillLimitRelBase) { + canImplement = pluginImplementor.canImplement(((DrillLimitRelBase) rel)); + } else if (rel instanceof Project) { + canImplement = pluginImplementor.canImplement(((Project) rel)); + } else if (rel instanceof Sort) { + canImplement = pluginImplementor.canImplement(((Sort) rel)); + } else if (rel instanceof Union) { + canImplement = pluginImplementor.canImplement(((Union) rel)); + } else if (rel instanceof Join) { + canImplement = pluginImplementor.canImplement(((Join) rel)); + } + return canImplement && super.matches(call); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java index 8d350f4..4685d43 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java @@ -2,9 +2,9 @@ package org.apache.drill.exec.store.plan.rule; import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelTrait; -import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Filter; +import org.apache.drill.exec.store.plan.PluginImplementor; import org.apache.drill.exec.store.plan.rel.PluginFilterRel; /** @@ -12,18 +12,17 @@ import org.apache.drill.exec.store.plan.rel.PluginFilterRel; */ public class PluginFilterRule extends PluginConverterRule { - public PluginFilterRule(RelTrait in, Convention out) { - super(Filter.class, in, out, "PluginFilterRule"); + public PluginFilterRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) { + super(Filter.class, in, out, "PluginFilterRule", pluginImplementor); } @Override public RelNode convert(RelNode rel) { Filter filter = (Filter) rel; - RelTraitSet traitSet = filter.getTraitSet().replace(getOutConvention()); return new PluginFilterRel( getOutConvention(), rel.getCluster(), - traitSet, + filter.getTraitSet().replace(getOutConvention()), convert(filter.getInput(), getOutConvention()), filter.getCondition()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java index 5b08891..3cde901 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java @@ -17,34 +17,49 @@ */ package org.apache.drill.exec.store.plan.rule; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTrait; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.DrillRelFactories; +import org.apache.drill.exec.planner.logical.RelOptHelper; import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.store.enumerable.plan.VertexDrel; import org.apache.drill.exec.store.plan.PluginImplementor; import org.apache.drill.exec.store.plan.rel.PluginIntermediatePrelRel; -import java.util.function.Predicate; import java.util.function.Supplier; -public class PluginIntermediatePrelConverterRule extends ConverterRule { - +public class PluginIntermediatePrelConverterRule extends RelOptRule { private final Supplier<PluginImplementor> implementorFactory; + private final RelTrait inTrait; + private final RelTrait outTrait; - public PluginIntermediatePrelConverterRule(Supplier<PluginImplementor> implementorFactory) { - super(VertexDrel.class, (Predicate<RelNode>) input -> true, DrillRel.DRILL_LOGICAL, - Prel.DRILL_PHYSICAL, DrillRelFactories.LOGICAL_BUILDER, "Plugin_prel_Converter"); + public PluginIntermediatePrelConverterRule(Convention convention, Supplier<PluginImplementor> implementorFactory) { + super( + RelOptHelper.some(VertexDrel.class, DrillRel.DRILL_LOGICAL, + RelOptHelper.any(RelNode.class, convention)), + DrillRelFactories.LOGICAL_BUILDER, "EnumerableIntermediatePrelConverterRule" + convention); this.implementorFactory = implementorFactory; + this.inTrait = DrillRel.DRILL_LOGICAL; + this.outTrait = Prel.DRILL_PHYSICAL; } @Override - public RelNode convert(RelNode in) { - return new PluginIntermediatePrelRel( + public void onMatch(RelOptRuleCall call) { + VertexDrel in = call.rel(0); + RelNode intermediatePrel = new PluginIntermediatePrelRel( in.getCluster(), - in.getTraitSet().replace(getOutTrait()), + in.getTraitSet().replace(outTrait), in.getInput(0), implementorFactory); + call.transformTo(intermediatePrel); + } + + @Override + public boolean matches(RelOptRuleCall call) { + return super.matches(call) && call.rel(0).getTraitSet().contains(inTrait); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java new file mode 100644 index 0000000..8b042f4 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java @@ -0,0 +1,32 @@ +package org.apache.drill.exec.store.plan.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelTrait; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.Project; +import org.apache.drill.exec.store.plan.PluginImplementor; +import org.apache.drill.exec.store.plan.rel.PluginJoinRel; +import org.apache.drill.exec.store.plan.rel.PluginProjectRel; + +/** + * Rule to convert a {@link Project} to a {@link PluginProjectRel}. + */ +public class PluginJoinRule extends PluginConverterRule { + + public PluginJoinRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) { + super(Join.class, in, out, "PluginProjectRule", pluginImplementor); + } + + @Override + public RelNode convert(RelNode rel) { + Join join = (Join) rel; + return new PluginJoinRel( + join.getCluster(), + join.getTraitSet().replace(getOutConvention()), + convert(join.getLeft(), getOutConvention()), + convert(join.getRight(), getOutConvention()), + join.getCondition(), + join.getJoinType()); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java index fea2276..e1ba189 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java @@ -2,24 +2,26 @@ package org.apache.drill.exec.store.plan.rule; import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelTrait; -import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.drill.exec.planner.common.DrillLimitRelBase; +import org.apache.drill.exec.store.plan.PluginImplementor; import org.apache.drill.exec.store.plan.rel.PluginLimitRel; public class PluginLimitRule extends PluginConverterRule { - public PluginLimitRule(RelTrait in, Convention out) { - super(DrillLimitRelBase.class, in, out, "PluginLimitRule"); + public PluginLimitRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) { + super(DrillLimitRelBase.class, in, out, "PluginLimitRule", pluginImplementor); } @Override public RelNode convert(RelNode rel) { DrillLimitRelBase sort = (DrillLimitRelBase) rel; - RelTraitSet traitSet = - sort.getTraitSet().replace(getOutConvention()); RelNode input = convert(sort.getInput(), sort.getInput().getTraitSet().replace(getOutConvention()).simplify()); - return new PluginLimitRel(rel.getCluster(), traitSet, input, - sort.getOffset(), sort.getFetch()); + return new PluginLimitRel( + rel.getCluster(), + sort.getTraitSet().replace(getOutConvention()), + input, + sort.getOffset(), + sort.getFetch()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java index d02bd1e..27ff94b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java @@ -2,9 +2,9 @@ package org.apache.drill.exec.store.plan.rule; import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelTrait; -import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Project; +import org.apache.drill.exec.store.plan.PluginImplementor; import org.apache.drill.exec.store.plan.rel.PluginProjectRel; /** @@ -12,16 +12,19 @@ import org.apache.drill.exec.store.plan.rel.PluginProjectRel; */ public class PluginProjectRule extends PluginConverterRule { - public PluginProjectRule(RelTrait in, Convention out) { - super(Project.class, in, out, "PluginProjectRule"); + public PluginProjectRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) { + super(Project.class, in, out, "PluginProjectRule", pluginImplementor); } @Override public RelNode convert(RelNode rel) { Project project = (Project) rel; - RelTraitSet traitSet = project.getTraitSet().replace(getOutConvention()); - return new PluginProjectRel(getOutConvention(), project.getCluster(), traitSet, - convert(project.getInput(), getOutConvention()), project.getProjects(), + return new PluginProjectRel( + getOutConvention(), + project.getCluster(), + project.getTraitSet().replace(getOutConvention()), + convert(project.getInput(), getOutConvention()), + project.getProjects(), project.getRowType()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java index 0378b0e..2e0a9f9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java @@ -2,9 +2,9 @@ package org.apache.drill.exec.store.plan.rule; import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelTrait; -import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Sort; +import org.apache.drill.exec.store.plan.PluginImplementor; import org.apache.drill.exec.store.plan.rel.PluginSortRel; /** @@ -12,17 +12,20 @@ import org.apache.drill.exec.store.plan.rel.PluginSortRel; */ public class PluginSortRule extends PluginConverterRule { - public PluginSortRule(RelTrait in, Convention out) { - super(Sort.class, in, out, "PluginSortRule"); + public PluginSortRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) { + super(Sort.class, in, out, "PluginSortRule", pluginImplementor); } @Override public RelNode convert(RelNode rel) { Sort sort = (Sort) rel; - RelTraitSet traitSet = sort.getTraitSet().replace(getOutConvention()) - .replace(sort.getCollation()); RelNode input = convert(sort.getInput(), sort.getInput().getTraitSet().replace(getOutConvention()).simplify()); - return new PluginSortRel(rel.getCluster(), traitSet, input, - sort.getCollation(), sort.offset, sort.fetch); + return new PluginSortRel( + rel.getCluster(), + sort.getTraitSet().replace(getOutConvention()).replace(sort.getCollation()), + input, + sort.getCollation(), + sort.offset, + sort.fetch); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java index c85f65e..4d058e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java @@ -3,40 +3,41 @@ package org.apache.drill.exec.store.plan.rule; import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelTrait; -import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.InvalidRelException; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Union; -import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.store.plan.PluginImplementor; import org.apache.drill.exec.store.plan.rel.PluginUnionRel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PluginUnionRule extends PluginConverterRule { + private static final Logger logger = LoggerFactory.getLogger(PluginUnionRule.class); - public PluginUnionRule(RelTrait in, Convention out) { - super(Union.class, in, out, "PluginUnionRule"); + public PluginUnionRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) { + super(Union.class, in, out, "PluginUnionRule", pluginImplementor); } @Override public RelNode convert(RelNode rel) { Union union = (Union) rel; - RelTraitSet traitSet = - union.getTraitSet().replace(getOutConvention()); try { return new PluginUnionRel( rel.getCluster(), - traitSet, + union.getTraitSet().replace(getOutConvention()), convertList(union.getInputs(), getOutConvention()), union.all, true); } catch (InvalidRelException e) { - throw new DrillRuntimeException(e); + logger.warn(e.getMessage()); + return null; } } - @Override - public boolean matches(RelOptRuleCall call) { - // allow converting for union all only, since Drill adds extra aggregation for union distinct, - // so we will convert both union all and aggregation later - return call.<Union>rel(0).all; - } +// @Override +// public boolean matches(RelOptRuleCall call) { +// // allow converting for union all only, since Drill adds extra aggregation for union distinct, +// // so we will convert both union all and aggregation later +// return call.<Union>rel(0).all && super.matches(call); +// } }
