This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch mongo in repository https://gitbox.apache.org/repos/asf/drill.git
commit a8bc68b61d212cb39476c0927fb90f641cb24a4c Author: Volodymyr Vysotskyi <[email protected]> AuthorDate: Mon Jul 19 19:11:59 2021 +0300 Incomplete changes for project --- .../exec/store/mongo/MongoAggregateUtils.java | 8 +- .../drill/exec/store/mongo/MongoFilterBuilder.java | 56 +++--- .../drill/exec/store/mongo/MongoGroupScan.java | 1 + .../drill/exec/store/mongo/MongoRecordReader.java | 8 +- .../drill/exec/store/mongo/MongoScanSpec.java | 21 ++- .../drill/exec/store/mongo/MongoSubScan.java | 14 +- .../common/{MongoCompareOp.java => MongoOp.java} | 30 +++- .../store/mongo/plan/MongoPluginImplementor.java | 55 +++--- .../drill/exec/store/mongo/plan/MongoRules.java | 194 ++++----------------- .../drill/exec/store/mongo/TestMongoQueries.java | 22 +++ 10 files changed, 184 insertions(+), 225 deletions(-) diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java index e196258..817644d 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java @@ -8,7 +8,7 @@ import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.validate.SqlValidatorUtil; -import org.apache.drill.exec.store.mongo.common.MongoCompareOp; +import org.apache.drill.exec.store.mongo.common.MongoOp; import org.bson.BsonArray; import org.bson.BsonDocument; import org.bson.BsonElement; @@ -42,7 +42,7 @@ public class MongoAggregateUtils { } static String quote(String s) { - return "'" + s + "'"; // TODO: handle embedded quotes + return "'" + s + "'"; } private static boolean needsQuote(String s) { @@ -116,9 +116,9 @@ public class MongoAggregateUtils { } else { assert args.size() == 1; String inName = inNames.get(args.get(0)); - expr = new BsonDocument(MongoCompareOp.COND.getCompareOp(), + expr = new BsonDocument(MongoOp.COND.getCompareOp(), new BsonArray(Arrays.asList( - new Document(MongoCompareOp.EQUAL.getCompareOp(), + new Document(MongoOp.EQUAL.getCompareOp(), new BsonArray(Arrays.asList( new BsonString(quote(inName)), BsonNull.VALUE))).toBsonDocument(), diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java index 8eecf00..5bea34e 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.store.mongo; -import java.io.IOException; import java.util.List; import org.apache.drill.common.FunctionNames; @@ -26,7 +25,7 @@ import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.visitors.AbstractExprVisitor; -import org.apache.drill.exec.store.mongo.common.MongoCompareOp; +import org.apache.drill.exec.store.mongo.common.MongoOp; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,21 +93,21 @@ public class MongoFilterBuilder extends List<LogicalExpression> args = op.args(); Document nodeScanSpec = null; String functionName = op.getName(); - for (int i = 0; i < args.size(); ++i) { + for (LogicalExpression arg : args) { switch (functionName) { - case FunctionNames.AND: - case FunctionNames.OR: - if (nodeScanSpec == null) { - nodeScanSpec = args.get(i).accept(this, null); - } else { - Document scanSpec = args.get(i).accept(this, null); - if (scanSpec != null) { - nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec); + case FunctionNames.AND: + case FunctionNames.OR: + if (nodeScanSpec == null) { + nodeScanSpec = arg.accept(this, null); } else { - allExpressionsConverted = false; + Document scanSpec = arg.accept(this, null); + if (scanSpec != null) { + nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec); + } else { + allExpressionsConverted = false; + } } - } - break; + break; } } return nodeScanSpec; @@ -160,50 +159,49 @@ public class MongoFilterBuilder extends } private Document createMongoScanSpec(String functionName, - SchemaPath field, Object fieldValue) throws ClassNotFoundException, - IOException { + SchemaPath field, Object fieldValue) { // extract the field name String fieldName = field.getRootSegmentPath(); - MongoCompareOp compareOp = null; + MongoOp compareOp = null; switch (functionName) { case FunctionNames.EQ: - compareOp = MongoCompareOp.EQUAL; + compareOp = MongoOp.EQUAL; break; case FunctionNames.NE: - compareOp = MongoCompareOp.NOT_EQUAL; + compareOp = MongoOp.NOT_EQUAL; break; case FunctionNames.GE: - compareOp = MongoCompareOp.GREATER_OR_EQUAL; + compareOp = MongoOp.GREATER_OR_EQUAL; break; case FunctionNames.GT: - compareOp = MongoCompareOp.GREATER; + compareOp = MongoOp.GREATER; break; case FunctionNames.LE: - compareOp = MongoCompareOp.LESS_OR_EQUAL; + compareOp = MongoOp.LESS_OR_EQUAL; break; case FunctionNames.LT: - compareOp = MongoCompareOp.LESS; + compareOp = MongoOp.LESS; break; case FunctionNames.IS_NULL: case "isNull": case "is null": - compareOp = MongoCompareOp.IFNULL; + compareOp = MongoOp.IFNULL; break; case FunctionNames.IS_NOT_NULL: case "isNotNull": case "is not null": - compareOp = MongoCompareOp.IFNOTNULL; + compareOp = MongoOp.IFNOTNULL; break; } if (compareOp != null) { Document queryFilter = new Document(); - if (compareOp == MongoCompareOp.IFNULL) { + if (compareOp == MongoOp.IFNULL) { queryFilter.put(fieldName, - new Document(MongoCompareOp.EQUAL.getCompareOp(), null)); - } else if (compareOp == MongoCompareOp.IFNOTNULL) { + new Document(MongoOp.EQUAL.getCompareOp(), null)); + } else if (compareOp == MongoOp.IFNOTNULL) { queryFilter.put(fieldName, - new Document(MongoCompareOp.NOT_EQUAL.getCompareOp(), null)); + new Document(MongoOp.NOT_EQUAL.getCompareOp(), null)); } else { queryFilter.put(fieldName, new Document(compareOp.getCompareOp(), fieldValue)); diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java index bc3817e..12fd384 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java @@ -460,6 +460,7 @@ public class MongoGroupScan extends AbstractGroupScan implements .setMinFilters(chunkInfo.getMinFilters()) .setMaxFilters(chunkInfo.getMaxFilters()) .setFilter(scanSpec.getFilters()) + .setFields(scanSpec.getFields()) .setDbName(scanSpec.getDbName()) .setCollectionName(scanSpec.getCollectionName()) .setHosts(chunkInfo.getChunkLocList()); diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java index 5cb007f..820eb8a 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java @@ -70,7 +70,7 @@ public class MongoRecordReader extends AbstractRecordReader { private Document filters; private List<Bson> operations; - private final Document fields; + private Document fields; private final FragmentContext fragmentContext; @@ -85,9 +85,9 @@ public class MongoRecordReader extends AbstractRecordReader { public MongoRecordReader(BaseMongoSubScanSpec subScanSpec, List<SchemaPath> projectedColumns, FragmentContext context, MongoStoragePlugin plugin) { - fields = new Document(); +// fields = new Document(); // exclude _id field, if not mentioned by user. - fields.put(DrillMongoConstants.ID, 0); +// fields.put(DrillMongoConstants.ID, 0); setColumns(projectedColumns); fragmentContext = context; this.plugin = plugin; @@ -100,6 +100,8 @@ public class MongoRecordReader extends AbstractRecordReader { shardedMongoSubScanSpec.getMinFilters(), shardedMongoSubScanSpec.getMaxFilters()); buildFilters(shardedMongoSubScanSpec.getFilter(), mergedFilters); + + fields = shardedMongoSubScanSpec.getFields(); } enableAllTextMode = fragmentContext.getOptions().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val; enableNanInf = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val; diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java index 17b548b..da10a54 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java @@ -33,20 +33,26 @@ public class MongoScanSpec { private Document filters; + private Document fields; + private List<Bson> operations = new ArrayList<>(); - @JsonCreator - public MongoScanSpec(@JsonProperty("dbName") String dbName, - @JsonProperty("collectionName") String collectionName) { + public MongoScanSpec(String dbName, + String collectionName) { this.dbName = dbName; this.collectionName = collectionName; } - public MongoScanSpec(String dbName, String collectionName, - Document filters, List<Bson> operations) { + @JsonCreator + public MongoScanSpec(@JsonProperty("dbName") String dbName, + @JsonProperty("collectionName") String collectionName, + @JsonProperty("filters") Document filters, + @JsonProperty("fields") Document fields, + @JsonProperty("operations") List<Bson> operations) { this.dbName = dbName; this.collectionName = collectionName; this.filters = filters; + this.fields = fields; this.operations = operations; } @@ -62,6 +68,10 @@ public class MongoScanSpec { return filters; } + public Document getFields() { + return fields; + } + public List<Bson> getOperations() { return operations; } @@ -72,6 +82,7 @@ public class MongoScanSpec { .field("dbName", dbName) .field("collectionName", collectionName) .field("filters", filters) + .field("fields", fields) .field("operations", operations) .toString(); } diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java index 692a939..ef31f25 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java @@ -128,6 +128,7 @@ public class MongoSubScan extends AbstractBase implements SubScan { protected Map<String, Object> minFilters; protected Map<String, Object> maxFilters; protected Document filter; + protected Document fields; @JsonCreator public ShardedMongoSubScanSpec(@JsonProperty("dbName") String dbName, @@ -135,11 +136,13 @@ public class MongoSubScan extends AbstractBase implements SubScan { @JsonProperty("hosts") List<String> hosts, @JsonProperty("minFilters") Map<String, Object> minFilters, @JsonProperty("maxFilters") Map<String, Object> maxFilters, - @JsonProperty("filters") Document filters) { + @JsonProperty("filters") Document filters, + @JsonProperty("fields") Document fields) { super(dbName, collectionName, hosts); this.minFilters = minFilters; this.maxFilters = maxFilters; this.filter = filters; + this.fields = fields; } ShardedMongoSubScanSpec() { @@ -172,6 +175,15 @@ public class MongoSubScan extends AbstractBase implements SubScan { return this; } + public Document getFields() { + return fields; + } + + public ShardedMongoSubScanSpec setFields(Document fields) { + this.fields = fields; + return this; + } + @Override public String toString() { return new PlanStringBuilder(this) diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoOp.java similarity index 67% rename from contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java rename to contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoOp.java index ef89bfb..55f8cc5 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoOp.java @@ -17,14 +17,30 @@ */ package org.apache.drill.exec.store.mongo.common; -public enum MongoCompareOp { - EQUAL("$eq"), NOT_EQUAL("$ne"), GREATER_OR_EQUAL("$gte"), GREATER("$gt"), LESS_OR_EQUAL( - "$lte"), LESS("$lt"), IN("$in"), AND("$and"), OR("$or"), REGEX("$regex"), OPTIONS( - "$options"), PROJECT("$project"), COND("$cond"), IFNULL("$ifNull"), IFNOTNULL( - "$ifNotNull"), SUM("$sum"), GROUP_BY("$group"), EXISTS("$exists"); - private String compareOp; +public enum MongoOp { + EQUAL("$eq"), + NOT_EQUAL("$ne"), + GREATER_OR_EQUAL("$gte"), + GREATER("$gt"), + LESS_OR_EQUAL("$lte"), + LESS("$lt"), + IN("$in"), + AND("$and"), + OR("$or"), + NOT("$not"), + REGEX("$regex"), + OPTIONS("$options"), + PROJECT("$project"), + COND("$cond"), + IFNULL("$ifNull"), + IFNOTNULL("$ifNotNull"), + SUM("$sum"), + GROUP_BY("$group"), + EXISTS("$exists"); - MongoCompareOp(String compareOp) { + private final String compareOp; + + MongoOp(String compareOp) { this.compareOp = compareOp; } diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java index fad7c25..c359c77 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java @@ -1,10 +1,13 @@ package org.apache.drill.exec.store.mongo.plan; import com.mongodb.client.model.Aggregates; +import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Pair; import org.apache.calcite.util.Util; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; @@ -26,6 +29,10 @@ import org.apache.drill.exec.store.plan.rel.PluginUnionRel; import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan; import org.apache.drill.exec.util.Utilities; import org.bson.BsonDocument; +import org.bson.BsonElement; +import org.bson.BsonInt32; +import org.bson.BsonString; +import org.bson.BsonValue; import org.bson.Document; import org.bson.conversions.Bson; @@ -40,6 +47,7 @@ public class MongoPluginImplementor implements PluginImplementor { private List<Bson> operations; private Document filters; private List<SchemaPath> columns; + private Document fields; private boolean runAggregate; @@ -90,24 +98,30 @@ public class MongoPluginImplementor implements PluginImplementor { public void implement(PluginProjectRel project) throws IOException { visitChild(project.getInput()); -// final MongoRules.RexToMongoTranslator translator = -// new MongoRules.RexToMongoTranslator( -// (JavaTypeFactory) project.getCluster().getTypeFactory(), -// MongoRules.mongoFieldNames(project.getInput().getRowType())); -// final List<String> items = new ArrayList<>(); -// for (Pair<RexNode, String> pair : project.getNamedProjects()) { -// final String name = pair.right; -// final String expr = pair.left.accept(translator); -// items.add(expr.equals("'$" + name + "'") -// ? MongoRules.maybeQuote(name) + ": 1" -// : MongoRules.maybeQuote(name) + ": " + expr); -// } -// final String findString = Util.toString(items, "{", ", ", "}"); -// final String aggregateString = "{$project: " + findString + "}"; -// final Pair<String, String> op = Pair.of(findString, aggregateString); + MongoRules.RexToMongoTranslator translator = + new MongoRules.RexToMongoTranslator( + (JavaTypeFactory) project.getCluster().getTypeFactory(), + MongoRules.mongoFieldNames(project.getInput().getRowType())); + List<BsonElement> items = new ArrayList<>(); + for (Pair<RexNode, String> pair : project.getNamedProjects()) { + String name = pair.right; + BsonValue expr = pair.left.accept(translator); + items.add(expr.equals(new BsonString("$" + name)) + ? new BsonElement(MongoRules.maybeQuote(name), new BsonInt32(1)) + : new BsonElement(MongoRules.maybeQuote(name), expr)); + } + BsonDocument projection = Aggregates.project(new BsonDocument(items)).toBsonDocument(); + if (runAggregate) { + operations.add(projection); + } else { + List<String> outNames = MongoAggregateUtils.mongoFieldNames(project.getRowType()); + this.columns = outNames.stream() + .map(SchemaPath::getSimplePath) + .collect(Collectors.toList()); + } // implementor.add(op.left, op.right); - List<String> outNames = MongoAggregateUtils.mongoFieldNames(project.getRowType()); +// List<String> outNames = MongoAggregateUtils.mongoFieldNames(project.getRowType()); // Document fields = new Document(); // fields.put(DrillMongoConstants.ID, 0); // List<String> inNames = MongoAggregateUtils.mongoFieldNames(project.getInput().getRowType()); @@ -118,9 +132,9 @@ public class MongoPluginImplementor implements PluginImplementor { // // operations.add(Aggregates.project(fields).toBsonDocument()); - this.columns = outNames.stream() - .map(SchemaPath::getSimplePath) - .collect(Collectors.toList()); +// this.columns = outNames.stream() +// .map(SchemaPath::getSimplePath) +// .collect(Collectors.toList()); } @Override @@ -187,6 +201,7 @@ public class MongoPluginImplementor implements PluginImplementor { groupScan = (MongoGroupScan) Utilities.getDrillTable(scan.getTable()).getGroupScan(); operations = new ArrayList<>(this.groupScan.getScanSpec().getOperations()); filters = groupScan.getScanSpec().getFilters(); + fields = groupScan.getScanSpec().getFields(); columns = groupScan.getColumns(); } @@ -198,7 +213,7 @@ public class MongoPluginImplementor implements PluginImplementor { @Override public GroupScan getPhysicalOperator() throws IOException { MongoScanSpec scanSpec = groupScan.getScanSpec(); - MongoScanSpec newSpec = new MongoScanSpec(scanSpec.getDbName(), scanSpec.getCollectionName(), filters, operations); + MongoScanSpec newSpec = new MongoScanSpec(scanSpec.getDbName(), scanSpec.getCollectionName(), filters, fields, operations); return new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(), newSpec, columns, runAggregate); } diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java index fddfa7c..7f0daf3 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java @@ -19,8 +19,6 @@ package org.apache.drill.exec.store.mongo.plan; import org.apache.calcite.adapter.enumerable.RexImpTable; import org.apache.calcite.adapter.enumerable.RexToLixTranslator; import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; @@ -32,75 +30,23 @@ import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.validate.SqlValidatorUtil; -import org.apache.calcite.util.Bug; -import org.apache.calcite.util.Util; -import org.apache.drill.exec.planner.logical.DrillRel; -import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule; -import org.apache.drill.exec.store.plan.rule.PluginIntermediatePrelConverterRule; -import org.apache.drill.exec.store.plan.rule.PluginAggregateRule; -import org.apache.drill.exec.store.plan.rule.PluginFilterRule; -import org.apache.drill.exec.store.plan.rule.PluginLimitRule; -import org.apache.drill.exec.store.plan.rule.PluginProjectRule; -import org.apache.drill.exec.store.plan.rule.PluginSortRule; -import org.apache.drill.exec.store.plan.rule.PluginUnionRule; +import org.apache.drill.exec.store.mongo.common.MongoOp; +import org.bson.BsonArray; +import org.bson.BsonDocument; +import org.bson.BsonInt32; +import org.bson.BsonNull; +import org.bson.BsonString; +import org.bson.BsonValue; import java.util.AbstractList; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class MongoRules { - public List<RelOptRule> sortRules(Convention out) { - return Arrays.asList( - new PluginSortRule(Convention.NONE, out), - new PluginSortRule(DrillRel.DRILL_LOGICAL, out) - ); - } - - public List<RelOptRule> limitRules(Convention out) { - return Arrays.asList( - new PluginLimitRule(Convention.NONE, out), - new PluginLimitRule(DrillRel.DRILL_LOGICAL, out) - ); - } - - public List<RelOptRule> filterRules(Convention out) { - return Arrays.asList( - new PluginFilterRule(Convention.NONE, out), - new PluginFilterRule(DrillRel.DRILL_LOGICAL, out) - ); - } - - public List<RelOptRule> projectRules(Convention out) { - return Arrays.asList( - new PluginProjectRule(Convention.NONE, out), - new PluginProjectRule(DrillRel.DRILL_LOGICAL, out) - ); - } - - public List<RelOptRule> aggregateRules(Convention out) { - return Arrays.asList( - new PluginAggregateRule(Convention.NONE, out), - new PluginAggregateRule(DrillRel.DRILL_LOGICAL, out) - ); - } - - public List<RelOptRule> unionRules(Convention out) { - return Arrays.asList( - new PluginUnionRule(Convention.NONE, out), - new PluginUnionRule(DrillRel.DRILL_LOGICAL, out) - ); - } - - public RelOptRule vertexRule(Convention out) { - return new VertexDrelConverterRule(out); - } - - public static final RelOptRule PREL_CONVERTER_INSTANCE = new PluginIntermediatePrelConverterRule(MongoPluginImplementor::new); - /** Returns 'string' if it is a call to item['string'], null otherwise. */ public static String isItem(RexCall call) { if (call.getOperator() != SqlStdOperatorTable.ITEM) { @@ -156,7 +102,7 @@ public class MongoRules { /** Translator from {@link RexNode} to strings in MongoDB's expression * language. */ - static class RexToMongoTranslator extends RexVisitorImpl<String> { + static class RexToMongoTranslator extends RexVisitorImpl<BsonValue> { private final JavaTypeFactory typeFactory; private final List<String> inFields; @@ -171,16 +117,16 @@ public class MongoRules { MONGO_OPERATORS.put(SqlStdOperatorTable.PLUS, "$add"); MONGO_OPERATORS.put(SqlStdOperatorTable.MINUS, "$subtract"); // Boolean - MONGO_OPERATORS.put(SqlStdOperatorTable.AND, "$and"); - MONGO_OPERATORS.put(SqlStdOperatorTable.OR, "$or"); - MONGO_OPERATORS.put(SqlStdOperatorTable.NOT, "$not"); + MONGO_OPERATORS.put(SqlStdOperatorTable.AND, MongoOp.AND.getCompareOp()); + MONGO_OPERATORS.put(SqlStdOperatorTable.OR, MongoOp.OR.getCompareOp()); + MONGO_OPERATORS.put(SqlStdOperatorTable.NOT, MongoOp.NOT.getCompareOp()); // Comparison - MONGO_OPERATORS.put(SqlStdOperatorTable.EQUALS, "$eq"); - MONGO_OPERATORS.put(SqlStdOperatorTable.NOT_EQUALS, "$ne"); - MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN, "$gt"); - MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, "$gte"); - MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN, "$lt"); - MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, "$lte"); + MONGO_OPERATORS.put(SqlStdOperatorTable.EQUALS, MongoOp.EQUAL.getCompareOp()); + MONGO_OPERATORS.put(SqlStdOperatorTable.NOT_EQUALS, MongoOp.NOT_EQUAL.getCompareOp()); + MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN, MongoOp.GREATER.getCompareOp()); + MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, MongoOp.GREATER_OR_EQUAL.getCompareOp()); + MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN, MongoOp.LESS.getCompareOp()); + MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, MongoOp.LESS_OR_EQUAL.getCompareOp()); } protected RexToMongoTranslator(JavaTypeFactory typeFactory, @@ -190,43 +136,45 @@ public class MongoRules { this.inFields = inFields; } - @Override public String visitLiteral(RexLiteral literal) { + @Override + public BsonValue visitLiteral(RexLiteral literal) { if (literal.getValue() == null) { - return "null"; + return BsonNull.VALUE; } - return "{$literal: " - + RexToLixTranslator.translateLiteral(literal, literal.getType(), - typeFactory, RexImpTable.NullAs.NOT_POSSIBLE) - + "}"; + return new BsonDocument("$literal", new BsonString( + RexToLixTranslator.translateLiteral(literal, literal.getType(), + typeFactory, RexImpTable.NullAs.NOT_POSSIBLE).toString())); } - @Override public String visitInputRef(RexInputRef inputRef) { - return maybeQuote( + @Override + public BsonValue visitInputRef(RexInputRef inputRef) { + return new BsonString( "$" + inFields.get(inputRef.getIndex())); } - @Override public String visitCall(RexCall call) { + @Override + public BsonValue visitCall(RexCall call) { String name = isItem(call); if (name != null) { - return "'$" + name + "'"; + return new BsonString("'$" + name + "'"); } - final List<String> strings = new ArrayList<>();//visitList(call.operands); + List<BsonValue> strings = call.operands.stream() + .map(operand -> operand.accept(this)) + .collect(Collectors.toList()); + if (call.getKind() == SqlKind.CAST) { return strings.get(0); } String stdOperator = MONGO_OPERATORS.get(call.getOperator()); if (stdOperator != null) { - return "{" + stdOperator + ": [" + Util.commaList(strings) + "]}"; + return new BsonDocument(stdOperator, new BsonArray(strings)); } if (call.getOperator() == SqlStdOperatorTable.ITEM) { final RexNode op1 = call.operands.get(1); if (op1 instanceof RexLiteral && op1.getType().getSqlTypeName() == SqlTypeName.INTEGER) { - if (!Bug.CALCITE_194_FIXED) { - return "'" + stripQuotes(strings.get(0)) + "[" - + ((RexLiteral) op1).getValue2() + "]'"; - } - return strings.get(0) + "[" + strings.get(1) + "]"; + return new BsonDocument("$arrayElemAt", new BsonArray( + Arrays.asList(strings.get(0), new BsonInt32(((RexLiteral) op1).getValueAs(Integer.class))))); } } if (call.getOperator() == SqlStdOperatorTable.CASE) { @@ -253,7 +201,7 @@ public class MongoRules { } } sb.append(finish); - return sb.toString(); + return BsonDocument.parse(sb.toString()); } throw new IllegalArgumentException("Translation of " + call + " is not supported by MongoProject"); @@ -435,72 +383,6 @@ public class MongoRules { */ - -// /** -// * Rule to convert an {@link org.apache.calcite.rel.logical.Union} to a -// * {@link MongoUnionRel}. -// */ -// public static class MongoUnionRule -// extends MongoConverterRule { -// private MongoUnionRule(MongoConvention out) { -// super( -// Union.class, -// Convention.NONE, -// out, -// "MongoUnionRule"); -// } -// -// public RelNode convert(RelNode rel) { -// final Union union = (Union) rel; -// final RelTraitSet traitSet = -// union.getTraitSet().replace(out); -// return new MongoUnionRel( -// rel.getCluster(), -// traitSet, -// convertList(union.getInputs(), traitSet), -// union.all); -// } -// } -// -// public static class MongoUnionRel -// extends Union -// implements MongoRel { -// public MongoUnionRel( -// RelOptCluster cluster, -// RelTraitSet traitSet, -// List<RelNode> inputs, -// boolean all) { -// super(cluster, traitSet, inputs, all); -// } -// -// public MongoUnionRel copy( -// RelTraitSet traitSet, List<RelNode> inputs, boolean all) { -// return new MongoUnionRel(getCluster(), traitSet, inputs, all); -// } -// -// @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { -// return super.computeSelfCost(planner).multiplyBy(.1); -// } -// -// public SqlString implement(MongoImplementor implementor) { -// return setOpSql(this, implementor, "UNION"); -// } -// } -// -// private static SqlString setOpSql( -// SetOp setOpRel, MongoImplementor implementor, String op) { -// final SqlBuilder buf = new SqlBuilder(implementor.dialect); -// for (Ord<RelNode> input : Ord.zip(setOpRel.getInputs())) { -// if (input.i > 0) { -// implementor.newline(buf) -// .append(op + (setOpRel.all ? " ALL " : "")); -// implementor.newline(buf); -// } -// buf.append(implementor.visitChild(input.i, input.e)); -// } -// return buf.toSqlString(); -// } - /* /** * Rule to convert an {@link org.apache.calcite.rel.logical.LogicalIntersect} diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java index 090bad0..e8f698b 100644 --- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java +++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java @@ -249,4 +249,26 @@ public class TestMongoQueries extends MongoTestBase { .baselineValues("0005", "Apple Fritter") .go(); } + + @Test + public void testProjectPushDown() throws Exception { + String query = "select t.id * t.id as c from mongo.%s.`%s` t"; + + queryBuilder() + .sql(query, DONUTS_DB, DONUTS_COLLECTION) + .planMatcher() + .include("MongoGroupScan.*project.*multiply") + .match(); + + testBuilder() + .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION) + .unOrdered() + .baselineColumns("c") + .baselineValues(1) + .baselineValues(4) + .baselineValues(9) + .baselineValues(16) + .baselineValues(25) + .go(); + } }
