This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch mongo
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 40ff39dfb11d4f5bd21cf572c883c41726e49cb5
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Thu Aug 5 19:06:00 2021 +0300

    Add join rel, fix project
---
 .../exec/store/mongo/MongoAggregateUtils.java      |  15 ++
 .../drill/exec/store/mongo/MongoStoragePlugin.java |   5 +-
 .../store/mongo/plan/MongoPluginImplementor.java   |  97 +++++++---
 .../store/mongo/plan/MongoPluginRulesProvider.java |  76 --------
 .../drill/exec/store/mongo/plan/MongoTable.java    | 197 ---------------------
 .../store/mongo/plan/RexToMongoTranslator.java     | 163 ++++++++++-------
 .../exec/store/mongo/TestMongoLimitPushDown.java   |   8 +-
 .../exec/store/mongo/TestMongoProjectPushDown.java |  12 +-
 .../drill/exec/store/PlannableStoragePlugin.java   |  14 +-
 .../drill/exec/store/PluginRulesProvider.java      |   1 +
 .../drill/exec/store/PluginRulesProviderImpl.java  |  97 ++++++++++
 .../exec/store/plan/AbstractPluginImplementor.java | 113 ++++++++++++
 .../drill/exec/store/plan/PluginImplementor.java   |  27 ++-
 .../exec/store/plan/rel/PluginAggregateRel.java    |  33 +---
 .../drill/exec/store/plan/rel/PluginFilterRel.java |   5 +
 .../drill/exec/store/plan/rel/PluginJoinRel.java   |  34 ++++
 .../drill/exec/store/plan/rel/PluginLimitRel.java  |   5 +
 .../exec/store/plan/rel/PluginProjectRel.java      |   5 +
 .../drill/exec/store/plan/rel/PluginRel.java       |   1 +
 .../drill/exec/store/plan/rel/PluginSortRel.java   |   5 +
 .../drill/exec/store/plan/rel/PluginUnionRel.java  |   5 +
 .../store/plan/rel/StoragePluginTableScan.java     |   5 +
 .../exec/store/plan/rule/PluginAggregateRule.java  |  31 ++--
 .../exec/store/plan/rule/PluginConverterRule.java  |  40 ++++-
 .../exec/store/plan/rule/PluginFilterRule.java     |   9 +-
 .../rule/PluginIntermediatePrelConverterRule.java  |  35 ++--
 .../drill/exec/store/plan/rule/PluginJoinRule.java |  32 ++++
 .../exec/store/plan/rule/PluginLimitRule.java      |  16 +-
 .../exec/store/plan/rule/PluginProjectRule.java    |  15 +-
 .../drill/exec/store/plan/rule/PluginSortRule.java |  17 +-
 .../exec/store/plan/rule/PluginUnionRule.java      |  29 +--
 31 files changed, 675 insertions(+), 472 deletions(-)

diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
index e362707..9f551f8 100644
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
+++ 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
@@ -159,4 +159,19 @@ public class MongoAggregateUtils {
     }
     return null;
   }
+
+  public static boolean supportsAggregation(AggregateCall aggregateCall) {
+    String name = aggregateCall.getAggregation().getName();
+    return name.equals(SqlStdOperatorTable.COUNT.getName())
+      || name.equals(SqlStdOperatorTable.SUM.getName())
+      || name.equals(SqlStdOperatorTable.SUM0.getName())
+      || name.equals(SqlStdOperatorTable.MIN.getName())
+      || name.equals(SqlStdOperatorTable.MAX.getName())
+      || name.equals(SqlStdOperatorTable.AVG.getName())
+      || name.equals(SqlStdOperatorTable.FIRST.getName())
+      || name.equals(SqlStdOperatorTable.LAST.getName())
+      || name.equals(SqlStdOperatorTable.STDDEV.getName())
+      || name.equals(SqlStdOperatorTable.STDDEV_SAMP.getName())
+      || name.equals(SqlStdOperatorTable.STDDEV_POP.getName());
+  }
 }
diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index 6262711..df0588d 100644
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -36,7 +36,8 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.PlannableStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePlugin;
-import org.apache.drill.exec.store.mongo.plan.MongoPluginRulesProvider;
+import org.apache.drill.exec.store.mongo.plan.MongoPluginImplementor;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
 import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory;
 import org.apache.drill.exec.store.plan.rel.PluginRel;
 import org.apache.drill.exec.store.security.HadoopCredentialsProvider;
@@ -82,7 +83,7 @@ public class MongoStoragePlugin extends 
PlannableStoragePlugin implements Storag
   private static MongoStoragePluginConfigs mongoStoragePluginBuilder(String 
name) {
     Convention convention = new Convention.Impl("MONGO." + name, 
PluginRel.class);
     return new MongoStoragePluginConfigs()
-        .rulesProvider(new MongoPluginRulesProvider(convention))
+        .rulesProvider(new PluginRulesProviderImpl(convention, 
MongoPluginImplementor::new))
         .supportsProjectPushdown(true)
         .supportsSortPushdown(true)
         .supportsAggregatePushdown(true)
diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
index 5561047..8680c6d 100644
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
+++ 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
@@ -4,15 +4,22 @@ import com.mongodb.client.model.Aggregates;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.Util;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
 import org.apache.drill.exec.planner.logical.DrillOptiq;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.planner.physical.PrelUtil;
@@ -20,7 +27,7 @@ import org.apache.drill.exec.store.mongo.MongoAggregateUtils;
 import org.apache.drill.exec.store.mongo.MongoFilterBuilder;
 import org.apache.drill.exec.store.mongo.MongoGroupScan;
 import org.apache.drill.exec.store.mongo.MongoScanSpec;
-import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.AbstractPluginImplementor;
 import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
 import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
 import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
@@ -43,7 +50,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
-public class MongoPluginImplementor implements PluginImplementor {
+public class MongoPluginImplementor extends AbstractPluginImplementor {
   private MongoGroupScan groupScan;
   private List<Bson> operations;
   private Document filters;
@@ -88,11 +95,11 @@ public class MongoPluginImplementor implements 
PluginImplementor {
 
     if (limit.getOffset() != null) {
       operations.add(
-          Aggregates.skip(((BigDecimal) ((RexLiteral) 
limit.getOffset()).getValue()).intValue()).toBsonDocument());
+          Aggregates.skip(rexLiteralIntValue((RexLiteral) 
limit.getOffset())).toBsonDocument());
     }
     if (limit.getFetch() != null) {
       operations.add(
-          Aggregates.limit(((BigDecimal) ((RexLiteral) 
limit.getFetch()).getValue()).intValue()).toBsonDocument());
+          Aggregates.limit(rexLiteralIntValue((RexLiteral) 
limit.getFetch())).toBsonDocument());
     }
   }
 
@@ -137,43 +144,34 @@ public class MongoPluginImplementor implements 
PluginImplementor {
     visitChild(sort.getInput());
 
     if (!sort.collation.getFieldCollations().isEmpty()) {
-      final List<String> keys = new ArrayList<>();
-      final List<RelDataTypeField> fields = sort.getRowType().getFieldList();
+      BsonDocument sortKeys = new BsonDocument();
+      List<RelDataTypeField> fields = sort.getRowType().getFieldList();
       for (RelFieldCollation fieldCollation : 
sort.collation.getFieldCollations()) {
-        final String name =
-            fields.get(fieldCollation.getFieldIndex()).getName();
-        keys.add(name + ": " + direction(fieldCollation));
-        if (false) {
-          // TODO: NULLS FIRST and NULLS LAST
-          switch (fieldCollation.nullDirection) {
-            case FIRST:
-              break;
-            case LAST:
-              break;
-            default:
-              break;
-          }
-        }
+        String name = fields.get(fieldCollation.getFieldIndex()).getName();
+        sortKeys.put(name, new BsonInt32(direction(fieldCollation)));
       }
 
-      operations.add(
-          Aggregates.sort(BsonDocument.parse(Util.toString(keys, "{", ", ", 
"}"))).toBsonDocument());
+      operations.add(Aggregates.sort(sortKeys).toBsonDocument());
     }
     if (sort.offset != null) {
       operations.add(
-          Aggregates.skip(((BigDecimal) ((RexLiteral) 
sort.offset).getValue()).intValue()).toBsonDocument());
+          Aggregates.skip(rexLiteralIntValue((RexLiteral) 
sort.offset)).toBsonDocument());
     }
     if (sort.fetch != null) {
       operations.add(
-          Aggregates.limit(((BigDecimal) ((RexLiteral) 
sort.fetch).getValue()).intValue()).toBsonDocument());
+          Aggregates.limit(rexLiteralIntValue((RexLiteral) 
sort.fetch)).toBsonDocument());
     }
   }
 
+  private int rexLiteralIntValue(RexLiteral offset) {
+    return ((BigDecimal) offset.getValue()).intValue();
+  }
+
   @Override
   public void implement(PluginUnionRel union) throws IOException {
     runAggregate = true;
 
-    MongoPluginImplementor childImplementor = copy();
+    MongoPluginImplementor childImplementor = new MongoPluginImplementor();
     childImplementor.runAggregate = true;
 
     boolean firstProcessed = false;
@@ -199,8 +197,51 @@ public class MongoPluginImplementor implements 
PluginImplementor {
   }
 
   @Override
-  public MongoPluginImplementor copy() {
-    return new MongoPluginImplementor();
+  public boolean canImplement(Aggregate aggregate) {
+    return aggregate.getGroupType() == Aggregate.Group.SIMPLE
+      && aggregate.getAggCallList().stream()
+        .noneMatch(AggregateCall::isDistinct)
+      && aggregate.getAggCallList().stream()
+        .allMatch(MongoAggregateUtils::supportsAggregation);
+  }
+
+  @Override
+  public boolean canImplement(Filter filter) {
+    LogicalExpression conditionExp = DrillOptiq.toDrill(
+      new 
DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
+      filter.getInput(),
+      filter.getCondition());
+    MongoFilterBuilder filterBuilder = new MongoFilterBuilder(conditionExp);
+    filterBuilder.parseTree();
+    return filterBuilder.isAllExpressionsConverted();
+  }
+
+  @Override
+  public boolean canImplement(DrillLimitRelBase limit) {
+    return true;
+  }
+
+  @Override
+  public boolean canImplement(Project project) {
+    return project.getProjects().stream()
+        .allMatch(RexToMongoTranslator::supportsExpression);
+  }
+
+  @Override
+  public boolean canImplement(Sort sort) {
+    return true;
+  }
+
+  @Override
+  public boolean canImplement(Union union) {
+    // allow converting for union all only, since Drill adds extra aggregation 
for union distinct,
+    // so we will convert both union all and aggregation later
+    return union.all;
+  }
+
+  @Override
+  public boolean canImplement(TableScan scan) {
+    return true;
   }
 
   @Override
diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginRulesProvider.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginRulesProvider.java
deleted file mode 100644
index 06319b6..0000000
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginRulesProvider.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.apache.drill.exec.store.mongo.plan;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.drill.exec.planner.logical.DrillRel;
-import org.apache.drill.exec.store.PluginRulesProvider;
-import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
-import org.apache.drill.exec.store.plan.rule.PluginAggregateRule;
-import org.apache.drill.exec.store.plan.rule.PluginFilterRule;
-import 
org.apache.drill.exec.store.plan.rule.PluginIntermediatePrelConverterRule;
-import org.apache.drill.exec.store.plan.rule.PluginLimitRule;
-import org.apache.drill.exec.store.plan.rule.PluginProjectRule;
-import org.apache.drill.exec.store.plan.rule.PluginSortRule;
-import org.apache.drill.exec.store.plan.rule.PluginUnionRule;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class MongoPluginRulesProvider implements PluginRulesProvider {
-  private final Convention convention;
-
-  public MongoPluginRulesProvider(Convention convention) {
-    this.convention = convention;
-  }
-
-  public List<RelOptRule> sortRules() {
-    return Arrays.asList(
-        new PluginSortRule(Convention.NONE, convention),
-        new PluginSortRule(DrillRel.DRILL_LOGICAL, convention)
-    );
-  }
-
-  public List<RelOptRule> limitRules() {
-    return Arrays.asList(
-        new PluginLimitRule(Convention.NONE, convention),
-        new PluginLimitRule(DrillRel.DRILL_LOGICAL, convention)
-    );
-  }
-
-  public List<RelOptRule> filterRules() {
-    return Arrays.asList(
-        new PluginFilterRule(Convention.NONE, convention),
-        new PluginFilterRule(DrillRel.DRILL_LOGICAL, convention)
-    );
-  }
-
-  public List<RelOptRule> projectRules() {
-    return Arrays.asList(
-        new PluginProjectRule(Convention.NONE, convention),
-        new PluginProjectRule(DrillRel.DRILL_LOGICAL, convention)
-    );
-  }
-
-  public List<RelOptRule> aggregateRules() {
-    return Arrays.asList(
-        new PluginAggregateRule(Convention.NONE, convention),
-        new PluginAggregateRule(DrillRel.DRILL_LOGICAL, convention)
-    );
-  }
-
-  public List<RelOptRule> unionRules() {
-    return Arrays.asList(
-        new PluginUnionRule(Convention.NONE, convention),
-        new PluginUnionRule(DrillRel.DRILL_LOGICAL, convention)
-    );
-  }
-
-  public RelOptRule vertexRule() {
-    return new VertexDrelConverterRule(convention);
-  }
-
-  @Override
-  public RelOptRule prelConverterRule() {
-    return new 
PluginIntermediatePrelConverterRule(MongoPluginImplementor::new);
-  }
-}
diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoTable.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoTable.java
deleted file mode 100644
index 68e01e9..0000000
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoTable.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.mongo.plan;
-
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import org.apache.calcite.adapter.java.AbstractQueryableTable;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.linq4j.QueryProvider;
-import org.apache.calcite.linq4j.Queryable;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.TranslatableTable;
-import org.apache.calcite.schema.impl.AbstractTableQueryable;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.bson.BsonDocument;
-import org.bson.conversions.Bson;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Table based on a MongoDB collection.
- */
-public class MongoTable extends AbstractQueryableTable
-    implements TranslatableTable {
-  private final String collectionName;
-
-  /** Creates a MongoTable. */
-  MongoTable(String collectionName) {
-    super(Object[].class);
-    this.collectionName = collectionName;
-  }
-
-  @Override public String toString() {
-    return "MongoTable {" + collectionName + "}";
-  }
-
-  @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-    final RelDataType mapType =
-        typeFactory.createMapType(
-            typeFactory.createSqlType(SqlTypeName.VARCHAR),
-            typeFactory.createTypeWithNullability(
-                typeFactory.createSqlType(SqlTypeName.ANY), true));
-    return typeFactory.builder().add("_MAP", mapType).build();
-  }
-
-  @Override public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
-      SchemaPlus schema, String tableName) {
-    return new MongoQueryable<>(queryProvider, schema, this, tableName);
-  }
-
-  @Override public RelNode toRel(
-      RelOptTable.ToRelContext context,
-      RelOptTable relOptTable) {
-    final RelOptCluster cluster = context.getCluster();
-    // cluster, traits, grpScan, table
-//    return new MongoTableScan(cluster, 
cluster.traitSetOf(MongoRel.CONVENTION),
-//        relOptTable, relOptTable.getRowType(), null);
-    return null;
-  }
-
-  /** Executes a "find" operation on the underlying collection.
-   *
-   * <p>For example,
-   * <code>zipsTable.find("{state: 'OR'}", "{city: 1, zipcode: 1}")</code></p>
-   *
-   * @param mongoDb MongoDB connection
-   * @param filterJson Filter JSON string, or null
-   * @param projectJson Project JSON string, or null
-   * @param fields List of fields to project; or null to return map
-   * @return Enumerator of results
-   */
-  private Enumerable<Object> find(MongoDatabase mongoDb, String filterJson,
-      String projectJson, List<Map.Entry<String, Class>> fields) {
-    final MongoCollection collection =
-        mongoDb.getCollection(collectionName);
-    final Bson filter =
-        filterJson == null ? null : BsonDocument.parse(filterJson);
-    final Bson project =
-        projectJson == null ? null : BsonDocument.parse(projectJson);
-//    final Function1<Document, Object> getter = 
MongoEnumerator.getter(fields);
-//    return new AbstractEnumerable<Object>() {
-//      @Override public Enumerator<Object> enumerator() {
-//        @SuppressWarnings("unchecked") final FindIterable<Document> cursor =
-//            collection.find(filter).projection(project);
-//        return new MongoEnumerator(cursor.iterator(), getter);
-//      }
-//    };
-    return null;
-  }
-
-  /** Executes an "aggregate" operation on the underlying collection.
-   *
-   * <p>For example:
-   * <code>zipsTable.aggregate(
-   * "{$filter: {state: 'OR'}",
-   * "{$group: {_id: '$city', c: {$sum: 1}, p: {$sum: '$pop'}}}")
-   * </code></p>
-   *
-   * @param mongoDb MongoDB connection
-   * @param fields List of fields to project; or null to return map
-   * @param operations One or more JSON strings
-   * @return Enumerator of results
-   */
-  private Enumerable<Object> aggregate(final MongoDatabase mongoDb,
-      List<Map.Entry<String, Class>> fields,
-      List<String> operations) {
-    List<Bson> list = new ArrayList<>();
-    for (String operation : operations) {
-      list.add(BsonDocument.parse(operation));
-    }
-//    Function1<Document, Object> getter =
-//        MongoEnumerator.getter(fields);
-//    return new AbstractEnumerable<Object>() {
-//      @Override public Enumerator<Object> enumerator() {
-//        final Iterator<Document> resultIterator;
-//        try {
-//          resultIterator = mongoDb.getCollection(collectionName)
-//              .aggregate(list).iterator();
-//        } catch (Exception e) {
-//          throw new RuntimeException("While running MongoDB query "
-//              + Util.toString(operations, "[", ",\n", "]"), e);
-//        }
-//        return new MongoEnumerator(resultIterator, getter);
-//      }
-//    };
-    return null;
-  }
-
-  /** Implementation of {@link Queryable} based on
-   * a {@link MongoTable}.
-   *
-   * @param <T> element type */
-  public static class MongoQueryable<T> extends AbstractTableQueryable<T> {
-    MongoQueryable(QueryProvider queryProvider, SchemaPlus schema,
-        MongoTable table, String tableName) {
-      super(queryProvider, schema, table, tableName);
-    }
-
-    @Override public Enumerator<T> enumerator() {
-      //noinspection unchecked
-      final Enumerable<T> enumerable =
-          (Enumerable<T>) getTable().find(getMongoDb(), null, null, null);
-      return enumerable.enumerator();
-    }
-
-    private MongoDatabase getMongoDb() {
-//      return schema.unwrap(MongoSchemaFactory.MongoSchema.class).mongoDb;
-      return null;
-    }
-
-    private MongoTable getTable() {
-      return (MongoTable) table;
-    }
-
-    @SuppressWarnings("UnusedDeclaration")
-    public Enumerable<Object> aggregate(List<Map.Entry<String, Class>> fields,
-        List<String> operations) {
-      return getTable().aggregate(getMongoDb(), fields, operations);
-    }
-
-    /** Called via code-generation.
-     *
-     * @param filterJson Filter document
-     * @param projectJson Projection document
-     * @param fields List of expected fields (and their types)
-     * @return result of mongo query
-     *
-     */
-    @SuppressWarnings("UnusedDeclaration")
-    public Enumerable<Object> find(String filterJson,
-        String projectJson, List<Map.Entry<String, Class>> fields) {
-      return getTable().find(getMongoDb(), filterJson, projectJson, fields);
-    }
-  }
-}
diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/RexToMongoTranslator.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/RexToMongoTranslator.java
index 478591f..8320fa9 100644
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/RexToMongoTranslator.java
+++ 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/RexToMongoTranslator.java
@@ -1,5 +1,6 @@
 package org.apache.drill.exec.store.mongo.plan;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.calcite.adapter.enumerable.RexImpTable;
 import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
@@ -21,7 +22,6 @@ import org.bson.BsonString;
 import org.bson.BsonValue;
 
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -35,59 +35,57 @@ class RexToMongoTranslator extends 
RexVisitorImpl<BsonValue> {
 
   private final List<String> inFields;
 
-  private static final Map<SqlOperator, String> MONGO_OPERATORS =
-      new HashMap<>();
-
-  static {
-    MONGO_OPERATORS.put(SqlStdOperatorTable.DIVIDE, "$divide");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.MULTIPLY, "$multiply");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.ABS, "$abs");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.ACOS, "$acos");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.ASIN, "$asin");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.ATAN, "$atan");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.ATAN2, "$atan2");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.CEIL, "$ceil");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.CONCAT, "$concat");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.COS, "$cos");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.DAYOFMONTH, "$dayOfMonth");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.WEEK, "$isoWeek");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.YEAR, "$isoWeekYear");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.DAYOFWEEK, "$isoDayOfWeek");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.DAYOFYEAR, "$dayOfYear");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.RADIANS, "$degreesToRadians");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.DENSE_RANK, "$denseRank");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.EXP, "$exp");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.FLOOR, "$floor");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.HOUR, "$hour");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.LN, "$ln");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.LOG10, "$log10");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.MINUTE, "$minute");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.MOD, "$mod");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.MONTH, "$month");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.POWER, "$pow");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.DEGREES, "$radiansToDegrees");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.RAND, "$rand");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.REPLACE, "$replaceAll");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.ROUND, "$round");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.SECOND, "$second");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.SIN, "$sin");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.SQRT, "$sqrt");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.SUBSTRING, "$substr");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.PLUS, "$add");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.MINUS, "$subtract");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.TAN, "$tan");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.TRIM, "trim");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.TRUNCATE, "$trunc");
-    MONGO_OPERATORS.put(SqlStdOperatorTable.AND, MongoOp.AND.getCompareOp());
-    MONGO_OPERATORS.put(SqlStdOperatorTable.OR, MongoOp.OR.getCompareOp());
-    MONGO_OPERATORS.put(SqlStdOperatorTable.NOT, MongoOp.NOT.getCompareOp());
-    MONGO_OPERATORS.put(SqlStdOperatorTable.EQUALS, 
MongoOp.EQUAL.getCompareOp());
-    MONGO_OPERATORS.put(SqlStdOperatorTable.NOT_EQUALS, 
MongoOp.NOT_EQUAL.getCompareOp());
-    MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN, 
MongoOp.GREATER.getCompareOp());
-    MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, 
MongoOp.GREATER_OR_EQUAL.getCompareOp());
-    MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN, 
MongoOp.LESS.getCompareOp());
-    MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, 
MongoOp.LESS_OR_EQUAL.getCompareOp());
-  }
+  private static final Map<SqlOperator, String> MONGO_OPERATORS = 
ImmutableMap.<SqlOperator, String>builder()
+    .put(SqlStdOperatorTable.DIVIDE, "$divide")
+    .put(SqlStdOperatorTable.MULTIPLY, "$multiply")
+    .put(SqlStdOperatorTable.ABS, "$abs")
+    .put(SqlStdOperatorTable.ACOS, "$acos")
+    .put(SqlStdOperatorTable.ASIN, "$asin")
+    .put(SqlStdOperatorTable.ATAN, "$atan")
+    .put(SqlStdOperatorTable.ATAN2, "$atan2")
+    .put(SqlStdOperatorTable.CEIL, "$ceil")
+    .put(SqlStdOperatorTable.CONCAT, "$concat")
+    .put(SqlStdOperatorTable.COS, "$cos")
+    .put(SqlStdOperatorTable.DAYOFMONTH, "$dayOfMonth")
+    .put(SqlStdOperatorTable.WEEK, "$isoWeek")
+    .put(SqlStdOperatorTable.YEAR, "$isoWeekYear")
+    .put(SqlStdOperatorTable.DAYOFWEEK, "$isoDayOfWeek")
+    .put(SqlStdOperatorTable.DAYOFYEAR, "$dayOfYear")
+    .put(SqlStdOperatorTable.RADIANS, "$degreesToRadians")
+    .put(SqlStdOperatorTable.DENSE_RANK, "$denseRank")
+    .put(SqlStdOperatorTable.EXP, "$exp")
+    .put(SqlStdOperatorTable.FLOOR, "$floor")
+    .put(SqlStdOperatorTable.HOUR, "$hour")
+    .put(SqlStdOperatorTable.LN, "$ln")
+    .put(SqlStdOperatorTable.LOG10, "$log10")
+    .put(SqlStdOperatorTable.MINUTE, "$minute")
+    .put(SqlStdOperatorTable.MOD, "$mod")
+    .put(SqlStdOperatorTable.MONTH, "$month")
+    .put(SqlStdOperatorTable.POWER, "$pow")
+    .put(SqlStdOperatorTable.DEGREES, "$radiansToDegrees")
+    .put(SqlStdOperatorTable.RAND, "$rand")
+    .put(SqlStdOperatorTable.REPLACE, "$replaceAll")
+    .put(SqlStdOperatorTable.ROUND, "$round")
+    .put(SqlStdOperatorTable.SECOND, "$second")
+    .put(SqlStdOperatorTable.SIN, "$sin")
+    .put(SqlStdOperatorTable.SQRT, "$sqrt")
+    .put(SqlStdOperatorTable.SUBSTRING, "$substr")
+    .put(SqlStdOperatorTable.PLUS, "$add")
+    .put(SqlStdOperatorTable.MINUS, "$subtract")
+    .put(SqlStdOperatorTable.TAN, "$tan")
+    .put(SqlStdOperatorTable.TRIM, "trim")
+    .put(SqlStdOperatorTable.TRUNCATE, "$trunc")
+    .put(SqlStdOperatorTable.AND, MongoOp.AND.getCompareOp())
+    .put(SqlStdOperatorTable.OR, MongoOp.OR.getCompareOp())
+    .put(SqlStdOperatorTable.NOT, MongoOp.NOT.getCompareOp())
+    .put(SqlStdOperatorTable.EQUALS, MongoOp.EQUAL.getCompareOp())
+    .put(SqlStdOperatorTable.NOT_EQUALS, MongoOp.NOT_EQUAL.getCompareOp())
+    .put(SqlStdOperatorTable.GREATER_THAN, MongoOp.GREATER.getCompareOp())
+    .put(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, 
MongoOp.GREATER_OR_EQUAL.getCompareOp())
+    .put(SqlStdOperatorTable.LESS_THAN, MongoOp.LESS.getCompareOp())
+    .put(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, 
MongoOp.LESS_OR_EQUAL.getCompareOp())
+    .build();
+
 
   protected RexToMongoTranslator(JavaTypeFactory typeFactory,
       List<String> inFields) {
@@ -129,11 +127,14 @@ class RexToMongoTranslator extends 
RexVisitorImpl<BsonValue> {
       return new BsonDocument(stdOperator, new BsonArray(strings));
     }
     if (call.getOperator() == SqlStdOperatorTable.ITEM) {
-      final RexNode op1 = call.operands.get(1);
-      if (op1 instanceof RexLiteral
-          && op1.getType().getSqlTypeName() == SqlTypeName.INTEGER) {
-        return new BsonDocument("$arrayElemAt", new BsonArray(
-            Arrays.asList(strings.get(0), new BsonInt32(((RexLiteral) 
op1).getValueAs(Integer.class)))));
+      RexNode op1 = call.operands.get(1);
+      if (op1 instanceof RexLiteral) {
+        if (op1.getType().getSqlTypeName() == SqlTypeName.INTEGER) {
+          return new BsonDocument("$arrayElemAt", new BsonArray(
+              Arrays.asList(strings.get(0), new BsonInt32(((RexLiteral) 
op1).getValueAs(Integer.class)))));
+        } else if (op1.getType().getSqlTypeName() == SqlTypeName.CHAR) {
+          return new BsonString(strings.get(0).asString().getValue() + "." + 
((RexLiteral) op1).getValueAs(String.class));
+        }
       }
     }
     if (call.getOperator() == SqlStdOperatorTable.CASE) {
@@ -171,8 +172,8 @@ class RexToMongoTranslator extends 
RexVisitorImpl<BsonValue> {
     if (call.getOperator() != SqlStdOperatorTable.ITEM) {
       return null;
     }
-    final RexNode op0 = call.operands.get(0);
-    final RexNode op1 = call.operands.get(1);
+    RexNode op0 = call.operands.get(0);
+    RexNode op1 = call.operands.get(1);
     if (op0 instanceof RexInputRef
         && ((RexInputRef) op0).getIndex() == 0
         && op1 instanceof RexLiteral
@@ -181,4 +182,44 @@ class RexToMongoTranslator extends 
RexVisitorImpl<BsonValue> {
     }
     return null;
   }
+
+  public static boolean supportsExpression(RexNode expr) {
+    return expr.accept(new RexMongoChecker());
+  }
+
+  private static class RexMongoChecker extends RexVisitorImpl<Boolean> {
+
+    protected RexMongoChecker() {
+      super(true);
+    }
+
+    @Override
+    public Boolean visitLiteral(RexLiteral literal) {
+      return true;
+    }
+
+    @Override
+    public Boolean visitInputRef(RexInputRef inputRef) {
+      return true;
+    }
+
+    @Override
+    public Boolean visitCall(RexCall call) {
+      if (isItem(call) != null
+        || call.getKind() == SqlKind.CAST
+        || call.getOperator() == SqlStdOperatorTable.CASE
+        || MONGO_OPERATORS.get(call.getOperator()) != null) {
+        return true;
+      }
+
+      if (call.getOperator() == SqlStdOperatorTable.ITEM) {
+        RexNode op = call.operands.get(1);
+        return op instanceof RexLiteral
+          && (op.getType().getSqlTypeName() == SqlTypeName.INTEGER
+          || op.getType().getSqlTypeName() == SqlTypeName.CHAR);
+      }
+
+      return false;
+    }
+  }
 }
diff --git 
a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
 
b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
index bb660a0..2344df7 100644
--- 
a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
+++ 
b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
@@ -33,7 +33,7 @@ public class TestMongoLimitPushDown extends MongoTestBase {
         .sql(sql)
         .planMatcher()
         .exclude("Limit\\(")
-        .include("MongoGroupScan.*limit=4")
+        .include("MongoGroupScan.*\"\\$limit\": 4")
         .match();
   }
 
@@ -44,7 +44,7 @@ public class TestMongoLimitPushDown extends MongoTestBase {
       .sql(sql)
       .planMatcher()
       .exclude("Limit")
-      .include("sort=\\{employee_id", "limit=4")
+      .include("MongoGroupScan.*\"\\$sort\": \\{\"employee_id\": 1}", 
"\"\\$limit\": 4")
       .match();
   }
 
@@ -55,7 +55,7 @@ public class TestMongoLimitPushDown extends MongoTestBase {
       .sql(sql)
       .planMatcher()
       .exclude("Limit")
-      .include("skip=5", "limit=4")
+      .include("\"\\$skip\": 5", "\"\\$limit\": 4")
       .match();
   }
 
@@ -66,7 +66,7 @@ public class TestMongoLimitPushDown extends MongoTestBase {
       .sql(sql)
       .planMatcher()
       .exclude("Limit")
-      .include("limit=4", "eq=52.17")
+      .include("\"\\$limit\": 4", "\"\\$eq\": 52\\.17")
       .match();
   }
 }
diff --git 
a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
 
b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
index 48d13ce..a691443 100644
--- 
a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
+++ 
b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
@@ -23,21 +23,21 @@ import static org.apache.drill.test.TestBuilder.mapOf;
 import org.apache.drill.categories.MongoStorageTest;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.exec.ExecConstants;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Ignore("DRILL-3775")
 @Category({SlowTest.class, MongoStorageTest.class})
 public class TestMongoProjectPushDown extends MongoTestBase {
 
-  /**
-   *
-   * @throws Exception
-   */
   @Test
   public void testComplexProjectPushdown() throws Exception {
 
+    queryBuilder()
+      .sql("select t.field_4.inner_3 as col_1, t.field_4 as col_2 from 
mongo.employee.schema_change t")
+      .planMatcher()
+      .include("MongoGroupScan.*\"\\$project\": \\{\"col_1\": 
\"\\$field_4.inner_3\", \"col_2\": \"\\$field_4\"\\}")
+      .match();
+
     try {
       testBuilder()
           .sqlQuery("select t.field_4.inner_3 as col_1, t.field_4 as col_2 
from mongo.employee.schema_change t")
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PlannableStoragePlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PlannableStoragePlugin.java
index c9d4b8c..7219921 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PlannableStoragePlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PlannableStoragePlugin.java
@@ -59,6 +59,9 @@ public abstract class PlannableStoragePlugin extends 
AbstractStoragePlugin {
         if (plannableStoragePluginConfigs.supportsUnionPushdown) {
           builder.addAll(rulesProvider.unionRules());
         }
+        if (plannableStoragePluginConfigs.supportsJoinPushdown) {
+          builder.addAll(rulesProvider.joinRules());
+        }
         if (plannableStoragePluginConfigs.supportsAggregatePushdown) {
           builder.addAll(rulesProvider.aggregateRules());
         }
@@ -89,6 +92,7 @@ public abstract class PlannableStoragePlugin extends 
AbstractStoragePlugin {
     private boolean supportsAggregatePushdown;
     private boolean supportsSortPushdown;
     private boolean supportsUnionPushdown;
+    private boolean supportsJoinPushdown;
     private boolean supportsLimitPushdown;
     private PluginRulesProvider rulesProvider;
     private Convention convention;
@@ -158,6 +162,15 @@ public abstract class PlannableStoragePlugin extends 
AbstractStoragePlugin {
       return self();
     }
 
+    public boolean supportsJoinPushdown() {
+      return supportsJoinPushdown;
+    }
+
+    public T supportsJoinPushdown(boolean supportsJoinPushdown) {
+      this.supportsJoinPushdown = supportsJoinPushdown;
+      return self();
+    }
+
     public boolean supportsLimitPushdown() {
       return supportsLimitPushdown;
     }
@@ -185,5 +198,4 @@ public abstract class PlannableStoragePlugin extends 
AbstractStoragePlugin {
       return this;
     }
   }
-
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java
index d35bf83..ab566ba 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java
@@ -11,6 +11,7 @@ public interface PluginRulesProvider {
   List<RelOptRule> projectRules();
   List<RelOptRule> aggregateRules();
   List<RelOptRule> unionRules();
+  List<RelOptRule> joinRules();
   RelOptRule vertexRule();
   RelOptRule prelConverterRule();
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProviderImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProviderImpl.java
new file mode 100644
index 0000000..9337a42
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProviderImpl.java
@@ -0,0 +1,97 @@
+package org.apache.drill.exec.store;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rule.PluginAggregateRule;
+import org.apache.drill.exec.store.plan.rule.PluginFilterRule;
+import 
org.apache.drill.exec.store.plan.rule.PluginIntermediatePrelConverterRule;
+import org.apache.drill.exec.store.plan.rule.PluginJoinRule;
+import org.apache.drill.exec.store.plan.rule.PluginLimitRule;
+import org.apache.drill.exec.store.plan.rule.PluginProjectRule;
+import org.apache.drill.exec.store.plan.rule.PluginSortRule;
+import org.apache.drill.exec.store.plan.rule.PluginUnionRule;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
+
+public class PluginRulesProviderImpl implements PluginRulesProvider {
+  private final Supplier<PluginImplementor> implementorSupplier;
+  private final PluginImplementor pluginImplementor;
+  private final Convention convention;
+
+  public PluginRulesProviderImpl(Convention convention, 
Supplier<PluginImplementor> implementorSupplier) {
+    this.convention = convention;
+    this.implementorSupplier = implementorSupplier;
+    this.pluginImplementor = implementorSupplier.get();
+  }
+
+  @Override
+  public List<RelOptRule> sortRules() {
+    return Arrays.asList(
+        new PluginSortRule(Convention.NONE, convention, pluginImplementor),
+        new PluginSortRule(DrillRel.DRILL_LOGICAL, convention, 
pluginImplementor)
+    );
+  }
+
+  @Override
+  public List<RelOptRule> limitRules() {
+    return Arrays.asList(
+        new PluginLimitRule(Convention.NONE, convention, pluginImplementor),
+        new PluginLimitRule(DrillRel.DRILL_LOGICAL, convention, 
pluginImplementor)
+    );
+  }
+
+  @Override
+  public List<RelOptRule> filterRules() {
+    return Arrays.asList(
+        new PluginFilterRule(Convention.NONE, convention, pluginImplementor),
+        new PluginFilterRule(DrillRel.DRILL_LOGICAL, convention, 
pluginImplementor)
+    );
+  }
+
+  @Override
+  public List<RelOptRule> projectRules() {
+    return Arrays.asList(
+        new PluginProjectRule(Convention.NONE, convention, pluginImplementor),
+        new PluginProjectRule(DrillRel.DRILL_LOGICAL, convention, 
pluginImplementor)
+    );
+  }
+
+  @Override
+  public List<RelOptRule> aggregateRules() {
+    return Arrays.asList(
+        new PluginAggregateRule(Convention.NONE, convention, 
pluginImplementor),
+        new PluginAggregateRule(DrillRel.DRILL_LOGICAL, convention, 
pluginImplementor)
+    );
+  }
+
+  @Override
+  public List<RelOptRule> unionRules() {
+    return Arrays.asList(
+        new PluginUnionRule(Convention.NONE, convention, pluginImplementor),
+        new PluginUnionRule(DrillRel.DRILL_LOGICAL, convention, 
pluginImplementor)
+    );
+  }
+
+  @Override
+  public List<RelOptRule> joinRules() {
+    return Arrays.asList(
+      new PluginJoinRule(Convention.NONE, convention, pluginImplementor),
+      new PluginJoinRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor)
+    );
+  }
+
+  @Override
+  public RelOptRule vertexRule() {
+    return new VertexDrelConverterRule(convention);
+  }
+
+  @Override
+  public RelOptRule prelConverterRule() {
+    return new PluginIntermediatePrelConverterRule(convention, 
implementorSupplier);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
new file mode 100644
index 0000000..1d6c744
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
@@ -0,0 +1,113 @@
+package org.apache.drill.exec.store.plan;
+
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginJoinRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public abstract class AbstractPluginImplementor implements PluginImplementor {
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractPluginImplementor.class);
+
+  @Override
+  public void implement(PluginAggregateRel aggregate) throws IOException {
+    throw getUnsupported("aggregate");
+  }
+
+  @Override
+  public void implement(PluginFilterRel filter) throws IOException {
+    throw getUnsupported("filter");
+  }
+
+  @Override
+  public void implement(PluginLimitRel limit) throws IOException {
+    throw getUnsupported("limit");
+  }
+
+  @Override
+  public void implement(PluginProjectRel project) throws IOException {
+    throw getUnsupported("project");
+  }
+
+  @Override
+  public void implement(PluginSortRel sort) throws IOException {
+    throw getUnsupported("sort");
+  }
+
+  @Override
+  public void implement(PluginUnionRel union) throws IOException {
+    throw getUnsupported("union");
+  }
+
+  @Override
+  public void implement(PluginJoinRel join) throws IOException {
+    throw getUnsupported("join");
+  }
+
+  @Override
+  public void implement(StoragePluginTableScan scan) throws IOException {
+    throw getUnsupported("scan");
+  }
+
+  @Override
+  public boolean canImplement(Aggregate aggregate) {
+    return false;
+  }
+
+  @Override
+  public boolean canImplement(Filter filter) {
+    return false;
+  }
+
+  @Override
+  public boolean canImplement(DrillLimitRelBase limit) {
+    return false;
+  }
+
+  @Override
+  public boolean canImplement(Project project) {
+    return false;
+  }
+
+  @Override
+  public boolean canImplement(Sort sort) {
+    return false;
+  }
+
+  @Override
+  public boolean canImplement(Union union) {
+    return false;
+  }
+
+  @Override
+  public boolean canImplement(TableScan scan) {
+    return false;
+  }
+
+  @Override
+  public boolean canImplement(Join scan) {
+    return false;
+  }
+
+  private UserException getUnsupported(String rel) {
+    return UserException.unsupportedError()
+        .message("Plugin implementor doesn't support push down for %", rel)
+        .build(logger);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
index 31f388d..77013e8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
@@ -1,9 +1,18 @@
 package org.apache.drill.exec.store.plan;
 
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
 import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
 import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginJoinRel;
 import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
 import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
 import org.apache.drill.exec.store.plan.rel.PluginRel;
@@ -27,9 +36,25 @@ public interface PluginImplementor {
 
   void implement(PluginUnionRel union) throws IOException;
 
+  void implement(PluginJoinRel join) throws IOException;
+
   void implement(StoragePluginTableScan scan) throws IOException;
 
-  PluginImplementor copy();
+  boolean canImplement(Aggregate aggregate);
+
+  boolean canImplement(Filter filter);
+
+  boolean canImplement(DrillLimitRelBase limit);
+
+  boolean canImplement(Project project);
+
+  boolean canImplement(Sort sort);
+
+  boolean canImplement(Union union);
+
+  boolean canImplement(Join scan);
+
+  boolean canImplement(TableScan scan);
 
   GroupScan getPhysicalOperator() throws IOException;
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
index 8505ac7..98b8ac1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
@@ -20,7 +20,6 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
@@ -40,38 +39,17 @@ public class PluginAggregateRel extends 
DrillAggregateRelBase implements PluginR
       RelNode input,
       ImmutableBitSet groupSet,
       List<ImmutableBitSet> groupSets,
-      List<AggregateCall> aggCalls)
-      throws InvalidRelException {
+      List<AggregateCall> aggCalls) {
     super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
     assert getConvention() == input.getConvention();
-
-    for (AggregateCall aggCall : aggCalls) {
-      if (aggCall.isDistinct()) {
-        throw new InvalidRelException(
-            "distinct aggregation not supported");
-      }
-    }
-    switch (getGroupType()) {
-    case SIMPLE:
-      break;
-    default:
-      throw new InvalidRelException("unsupported group type: "
-          + getGroupType());
-    }
   }
 
   @Override
   public Aggregate copy(RelTraitSet traitSet, RelNode input,
       ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets,
       List<AggregateCall> aggCalls) {
-    try {
-      return new PluginAggregateRel(getCluster(), traitSet, input,
-          groupSet, groupSets, aggCalls);
-    } catch (InvalidRelException e) {
-      // Semantic error not possible. Must be a bug. Convert to
-      // internal error.
-      throw new AssertionError(e);
-    }
+    return new PluginAggregateRel(getCluster(), traitSet, input,
+        groupSet, groupSets, aggCalls);
   }
 
   @Override
@@ -84,4 +62,9 @@ public class PluginAggregateRel extends DrillAggregateRelBase 
implements PluginR
   public void implement(PluginImplementor implementor) throws IOException {
     implementor.implement(this);
   }
+
+  @Override
+  public boolean canImplement(PluginImplementor implementor) {
+    return implementor.canImplement(this);
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java
index 661ce21..d36ace7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java
@@ -56,4 +56,9 @@ public class PluginFilterRel extends DrillFilterRelBase 
implements PluginRel {
   public void implement(PluginImplementor implementor) throws IOException {
     implementor.implement(this);
   }
+
+  @Override
+  public boolean canImplement(PluginImplementor implementor) {
+    return implementor.canImplement(this);
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginJoinRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginJoinRel.java
new file mode 100644
index 0000000..aaad170
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginJoinRel.java
@@ -0,0 +1,34 @@
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillJoinRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+
+public class PluginJoinRel extends DrillJoinRelBase implements PluginRel {
+
+  public PluginJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode 
left, RelNode right, RexNode condition, JoinRelType joinType) {
+    super(cluster, traits, left, right, condition, joinType);
+  }
+
+  @Override
+  public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, 
RelNode right, JoinRelType joinType, boolean semiJoinDone) {
+    return new PluginJoinRel(getCluster(), traitSet, left, right, 
conditionExpr, joinType);
+  }
+
+  @Override
+  public void implement(PluginImplementor implementor) throws IOException {
+    implementor.implement(this);
+  }
+
+  @Override
+  public boolean canImplement(PluginImplementor implementor) {
+    return false;
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java
index b74aac4..4029a1d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java
@@ -51,4 +51,9 @@ public class PluginLimitRel extends DrillLimitRelBase 
implements PluginRel {
   public void implement(PluginImplementor implementor) throws IOException {
     implementor.implement(this);
   }
+
+  @Override
+  public boolean canImplement(PluginImplementor implementor) {
+    return implementor.canImplement(this);
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java
index 49a2cac..1168aaa 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java
@@ -56,4 +56,9 @@ public class PluginProjectRel extends DrillProjectRelBase 
implements PluginRel {
   public void implement(PluginImplementor implementor) throws IOException {
     implementor.implement(this);
   }
+
+  @Override
+  public boolean canImplement(PluginImplementor implementor) {
+    return implementor.canImplement(this);
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java
index 7364e70..d6e1c47 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java
@@ -8,4 +8,5 @@ import java.io.IOException;
 public interface PluginRel extends RelNode {
   void implement(PluginImplementor implementor) throws IOException;
 
+  boolean canImplement(PluginImplementor implementor);
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java
index 255bb2d..ac937b8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java
@@ -58,4 +58,9 @@ public class PluginSortRel extends DrillSortRelBase 
implements PluginRel {
   public boolean canBeDropped() {
     return false;
   }
+
+  @Override
+  public boolean canImplement(PluginImplementor implementor) {
+    return implementor.canImplement(this);
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
index 73227bd..5f75cdc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
@@ -39,4 +39,9 @@ public class PluginUnionRel extends DrillUnionRelBase 
implements PluginRel {
   public void implement(PluginImplementor implementor) throws IOException {
     implementor.implement(this);
   }
+
+  @Override
+  public boolean canImplement(PluginImplementor implementor) {
+    return implementor.canImplement(this);
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
index a062b2e..fbfb085 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
@@ -71,4 +71,9 @@ public class StoragePluginTableScan extends DrillScanRelBase 
implements PluginRe
   protected String computeDigest() {
     return super.computeDigest();
   }
+
+  @Override
+  public boolean canImplement(PluginImplementor implementor) {
+    return implementor.canImplement(this);
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java
index 616493d..111787f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java
@@ -2,37 +2,26 @@ package org.apache.drill.exec.store.plan.rule;
 
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
+import org.apache.drill.exec.store.plan.PluginImplementor;
 import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class PluginAggregateRule extends PluginConverterRule {
-  private static final Logger logger = 
LoggerFactory.getLogger(PluginAggregateRule.class);
 
-  public PluginAggregateRule(RelTrait in, Convention out) {
-    super(Aggregate.class, in, out, "PluginAggregateRule");
+  public PluginAggregateRule(RelTrait in, Convention out, PluginImplementor 
pluginImplementor) {
+    super(Aggregate.class, in, out, "PluginAggregateRule", pluginImplementor);
   }
 
   @Override
   public RelNode convert(RelNode rel) {
     Aggregate agg = (Aggregate) rel;
-    RelTraitSet traitSet =
-        agg.getTraitSet().replace(getOutConvention());
-    try {
-      return new PluginAggregateRel(
-          rel.getCluster(),
-          traitSet,
-          convert(agg.getInput(), traitSet.simplify()),
-          agg.getGroupSet(),
-          agg.getGroupSets(),
-          agg.getAggCallList());
-    } catch (InvalidRelException e) {
-      logger.warn(e.toString());
-      return null;
-    }
+    return new PluginAggregateRel(
+        rel.getCluster(),
+        agg.getTraitSet().replace(getOutConvention()),
+        convert(agg.getInput(), 
agg.getTraitSet().replace(getOutConvention()).simplify()),
+        agg.getGroupSet(),
+        agg.getGroupSets(),
+        agg.getAggCallList());
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java
index 5801d83..15d6baf 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java
@@ -1,16 +1,54 @@
 package org.apache.drill.exec.store.plan.rule;
 
 import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.Union;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
+import org.apache.drill.exec.store.plan.PluginImplementor;
 
 import java.util.function.Predicate;
 
 public abstract class PluginConverterRule extends ConverterRule {
+  private final PluginImplementor pluginImplementor;
 
-  protected PluginConverterRule(Class<? extends RelNode> clazz, RelTrait in, 
Convention out, String description) {
+  protected PluginConverterRule(Class<? extends RelNode> clazz,
+      RelTrait in, Convention out, String description, PluginImplementor 
pluginImplementor) {
     super(clazz, (Predicate<RelNode>) input -> true, in, out, 
DrillRelFactories.LOGICAL_BUILDER, description);
+    this.pluginImplementor = pluginImplementor;
+  }
+
+  public PluginImplementor getPluginImplementor() {
+    return pluginImplementor;
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    RelNode rel = call.rel(0);
+    boolean canImplement = false;
+    if (rel instanceof Aggregate) {
+      canImplement = pluginImplementor.canImplement(((Aggregate) rel));
+    } else if (rel instanceof Filter) {
+      canImplement = pluginImplementor.canImplement(((Filter) rel));
+    } else if (rel instanceof DrillLimitRelBase) {
+      canImplement = pluginImplementor.canImplement(((DrillLimitRelBase) rel));
+    } else if (rel instanceof Project) {
+      canImplement = pluginImplementor.canImplement(((Project) rel));
+    } else if (rel instanceof Sort) {
+      canImplement = pluginImplementor.canImplement(((Sort) rel));
+    } else if (rel instanceof Union) {
+      canImplement = pluginImplementor.canImplement(((Union) rel));
+    } else if (rel instanceof Join) {
+      canImplement = pluginImplementor.canImplement(((Join) rel));
+    }
+    return canImplement && super.matches(call);
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
index 8d350f4..4685d43 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
@@ -2,9 +2,9 @@ package org.apache.drill.exec.store.plan.rule;
 
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Filter;
+import org.apache.drill.exec.store.plan.PluginImplementor;
 import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
 
 /**
@@ -12,18 +12,17 @@ import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
  */
 public class PluginFilterRule extends PluginConverterRule {
 
-  public PluginFilterRule(RelTrait in, Convention out) {
-    super(Filter.class, in, out, "PluginFilterRule");
+  public PluginFilterRule(RelTrait in, Convention out, PluginImplementor 
pluginImplementor) {
+    super(Filter.class, in, out, "PluginFilterRule", pluginImplementor);
   }
 
   @Override
   public RelNode convert(RelNode rel) {
     Filter filter = (Filter) rel;
-    RelTraitSet traitSet = filter.getTraitSet().replace(getOutConvention());
     return new PluginFilterRel(
         getOutConvention(),
         rel.getCluster(),
-        traitSet,
+        filter.getTraitSet().replace(getOutConvention()),
         convert(filter.getInput(), getOutConvention()),
         filter.getCondition());
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
index 5b08891..3cde901 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
@@ -17,34 +17,49 @@
  */
 package org.apache.drill.exec.store.plan.rule;
 
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
 import org.apache.drill.exec.store.plan.PluginImplementor;
 import org.apache.drill.exec.store.plan.rel.PluginIntermediatePrelRel;
 
-import java.util.function.Predicate;
 import java.util.function.Supplier;
 
-public class PluginIntermediatePrelConverterRule extends ConverterRule {
-
+public class PluginIntermediatePrelConverterRule extends RelOptRule {
   private final Supplier<PluginImplementor> implementorFactory;
+  private final RelTrait inTrait;
+  private final RelTrait outTrait;
 
-  public PluginIntermediatePrelConverterRule(Supplier<PluginImplementor> 
implementorFactory) {
-    super(VertexDrel.class, (Predicate<RelNode>) input -> true, 
DrillRel.DRILL_LOGICAL,
-        Prel.DRILL_PHYSICAL, DrillRelFactories.LOGICAL_BUILDER, 
"Plugin_prel_Converter");
+  public PluginIntermediatePrelConverterRule(Convention convention, 
Supplier<PluginImplementor> implementorFactory) {
+    super(
+        RelOptHelper.some(VertexDrel.class, DrillRel.DRILL_LOGICAL,
+            RelOptHelper.any(RelNode.class, convention)),
+        DrillRelFactories.LOGICAL_BUILDER, 
"EnumerableIntermediatePrelConverterRule" + convention);
     this.implementorFactory = implementorFactory;
+    this.inTrait = DrillRel.DRILL_LOGICAL;
+    this.outTrait = Prel.DRILL_PHYSICAL;
   }
 
   @Override
-  public RelNode convert(RelNode in) {
-    return new PluginIntermediatePrelRel(
+  public void onMatch(RelOptRuleCall call) {
+    VertexDrel in = call.rel(0);
+    RelNode intermediatePrel = new PluginIntermediatePrelRel(
         in.getCluster(),
-        in.getTraitSet().replace(getOutTrait()),
+        in.getTraitSet().replace(outTrait),
         in.getInput(0),
         implementorFactory);
+    call.transformTo(intermediatePrel);
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    return super.matches(call) && call.rel(0).getTraitSet().contains(inTrait);
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
new file mode 100644
index 0000000..8b042f4
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
@@ -0,0 +1,32 @@
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginJoinRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+
+/**
+ * Rule to convert a {@link Project} to a {@link PluginProjectRel}.
+ */
+public class PluginJoinRule extends PluginConverterRule {
+
+  public PluginJoinRule(RelTrait in, Convention out, PluginImplementor 
pluginImplementor) {
+    super(Join.class, in, out, "PluginProjectRule", pluginImplementor);
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Join join = (Join) rel;
+    return new PluginJoinRel(
+        join.getCluster(),
+        join.getTraitSet().replace(getOutConvention()),
+        convert(join.getLeft(), getOutConvention()),
+        convert(join.getRight(), getOutConvention()),
+        join.getCondition(),
+        join.getJoinType());
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java
index fea2276..e1ba189 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java
@@ -2,24 +2,26 @@ package org.apache.drill.exec.store.plan.rule;
 
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
 import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
 
 public class PluginLimitRule extends PluginConverterRule {
 
-  public PluginLimitRule(RelTrait in, Convention out) {
-    super(DrillLimitRelBase.class, in, out, "PluginLimitRule");
+  public PluginLimitRule(RelTrait in, Convention out, PluginImplementor 
pluginImplementor) {
+    super(DrillLimitRelBase.class, in, out, "PluginLimitRule", 
pluginImplementor);
   }
 
   @Override
   public RelNode convert(RelNode rel) {
     DrillLimitRelBase sort = (DrillLimitRelBase) rel;
-    RelTraitSet traitSet =
-        sort.getTraitSet().replace(getOutConvention());
     RelNode input = convert(sort.getInput(), 
sort.getInput().getTraitSet().replace(getOutConvention()).simplify());
-    return new PluginLimitRel(rel.getCluster(), traitSet, input,
-        sort.getOffset(), sort.getFetch());
+    return new PluginLimitRel(
+        rel.getCluster(),
+        sort.getTraitSet().replace(getOutConvention()),
+        input,
+        sort.getOffset(),
+        sort.getFetch());
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java
index d02bd1e..27ff94b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java
@@ -2,9 +2,9 @@ package org.apache.drill.exec.store.plan.rule;
 
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Project;
+import org.apache.drill.exec.store.plan.PluginImplementor;
 import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
 
 /**
@@ -12,16 +12,19 @@ import 
org.apache.drill.exec.store.plan.rel.PluginProjectRel;
  */
 public class PluginProjectRule extends PluginConverterRule {
 
-  public PluginProjectRule(RelTrait in, Convention out) {
-    super(Project.class, in, out, "PluginProjectRule");
+  public PluginProjectRule(RelTrait in, Convention out, PluginImplementor 
pluginImplementor) {
+    super(Project.class, in, out, "PluginProjectRule", pluginImplementor);
   }
 
   @Override
   public RelNode convert(RelNode rel) {
     Project project = (Project) rel;
-    RelTraitSet traitSet = project.getTraitSet().replace(getOutConvention());
-    return new PluginProjectRel(getOutConvention(), project.getCluster(), 
traitSet,
-        convert(project.getInput(), getOutConvention()), project.getProjects(),
+    return new PluginProjectRel(
+        getOutConvention(),
+        project.getCluster(),
+        project.getTraitSet().replace(getOutConvention()),
+        convert(project.getInput(), getOutConvention()),
+        project.getProjects(),
         project.getRowType());
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java
index 0378b0e..2e0a9f9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java
@@ -2,9 +2,9 @@ package org.apache.drill.exec.store.plan.rule;
 
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Sort;
+import org.apache.drill.exec.store.plan.PluginImplementor;
 import org.apache.drill.exec.store.plan.rel.PluginSortRel;
 
 /**
@@ -12,17 +12,20 @@ import org.apache.drill.exec.store.plan.rel.PluginSortRel;
  */
 public class PluginSortRule extends PluginConverterRule {
 
-  public PluginSortRule(RelTrait in, Convention out) {
-    super(Sort.class, in, out, "PluginSortRule");
+  public PluginSortRule(RelTrait in, Convention out, PluginImplementor 
pluginImplementor) {
+    super(Sort.class, in, out, "PluginSortRule", pluginImplementor);
   }
 
   @Override
   public RelNode convert(RelNode rel) {
     Sort sort = (Sort) rel;
-    RelTraitSet traitSet = sort.getTraitSet().replace(getOutConvention())
-            .replace(sort.getCollation());
     RelNode input = convert(sort.getInput(), 
sort.getInput().getTraitSet().replace(getOutConvention()).simplify());
-    return new PluginSortRel(rel.getCluster(), traitSet, input,
-        sort.getCollation(), sort.offset, sort.fetch);
+    return new PluginSortRel(
+        rel.getCluster(),
+        
sort.getTraitSet().replace(getOutConvention()).replace(sort.getCollation()),
+        input,
+        sort.getCollation(),
+        sort.offset,
+        sort.fetch);
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java
index c85f65e..4d058e9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java
@@ -3,40 +3,41 @@ package org.apache.drill.exec.store.plan.rule;
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Union;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.store.plan.PluginImplementor;
 import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PluginUnionRule extends PluginConverterRule {
+  private static final Logger logger = 
LoggerFactory.getLogger(PluginUnionRule.class);
 
-  public PluginUnionRule(RelTrait in, Convention out) {
-    super(Union.class, in, out, "PluginUnionRule");
+  public PluginUnionRule(RelTrait in, Convention out, PluginImplementor 
pluginImplementor) {
+    super(Union.class, in, out, "PluginUnionRule", pluginImplementor);
   }
 
   @Override
   public RelNode convert(RelNode rel) {
     Union union = (Union) rel;
-    RelTraitSet traitSet =
-        union.getTraitSet().replace(getOutConvention());
     try {
       return new PluginUnionRel(
           rel.getCluster(),
-          traitSet,
+          union.getTraitSet().replace(getOutConvention()),
           convertList(union.getInputs(), getOutConvention()),
           union.all,
           true);
     } catch (InvalidRelException e) {
-      throw new DrillRuntimeException(e);
+      logger.warn(e.getMessage());
+      return null;
     }
   }
 
-  @Override
-  public boolean matches(RelOptRuleCall call) {
-    // allow converting for union all only, since Drill adds extra aggregation 
for union distinct,
-    // so we will convert both union all and aggregation later
-    return call.<Union>rel(0).all;
-  }
+//  @Override
+//  public boolean matches(RelOptRuleCall call) {
+//    // allow converting for union all only, since Drill adds extra 
aggregation for union distinct,
+//    // so we will convert both union all and aggregation later
+//    return call.<Union>rel(0).all && super.matches(call);
+//  }
 }

Reply via email to