http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/DruidResultEnumerator.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidResultEnumerator.java
 
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidResultEnumerator.java
deleted file mode 100644
index 35b97b3..0000000
--- 
a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidResultEnumerator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.adapter.druid;
-
-/**
- * Created by jhyde on 3/9/16.
- */
-public class DruidResultEnumerator {
-}
-
-// End DruidResultEnumerator.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java 
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
index 290f548..7b6bc78 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
@@ -22,7 +22,6 @@ import org.apache.calcite.plan.RelOptPredicateList;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
@@ -41,6 +40,7 @@ import org.apache.calcite.rel.rules.PushProjector;
 import org.apache.calcite.rel.rules.SortProjectTransposeRule;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexExecutor;
@@ -50,14 +50,10 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexShuttle;
 import org.apache.calcite.rex.RexSimplify;
 import org.apache.calcite.rex.RexUtil;
-import org.apache.calcite.runtime.PredicateImpl;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.type.SqlTypeFamily;
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.tools.RelBuilderFactory;
-import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 import org.apache.calcite.util.trace.CalciteTrace;
@@ -67,7 +63,7 @@ import org.apache.commons.lang3.tuple.Triple;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import org.joda.time.Interval;
@@ -79,6 +75,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import javax.annotation.Nullable;
+
 /**
  * Rules and relational operators for {@link DruidQuery}.
  */
@@ -113,6 +111,8 @@ public class DruidRules {
       new DruidPostAggregationProjectRule(RelFactories.LOGICAL_BUILDER);
   public static final DruidAggregateExtractProjectRule PROJECT_EXTRACT_RULE =
       new DruidAggregateExtractProjectRule(RelFactories.LOGICAL_BUILDER);
+  public static final DruidHavingFilterRule DRUID_HAVING_FILTER_RULE =
+      new DruidHavingFilterRule(RelFactories.LOGICAL_BUILDER);
 
   public static final List<RelOptRule> RULES =
       ImmutableList.of(FILTER,
@@ -127,73 +127,8 @@ public class DruidRules {
           FILTER_PROJECT_TRANSPOSE,
           PROJECT_SORT_TRANSPOSE,
           SORT,
-          SORT_PROJECT_TRANSPOSE);
-
-  /** Predicate that returns whether Druid can not handle an aggregate. */
-  private static final Predicate<Triple<Aggregate, RelNode, DruidQuery>> 
BAD_AGG =
-      new PredicateImpl<Triple<Aggregate, RelNode, DruidQuery>>() {
-        public boolean test(Triple<Aggregate, RelNode, DruidQuery> triple) {
-          final Aggregate aggregate = triple.getLeft();
-          final RelNode node = triple.getMiddle();
-          final DruidQuery query = triple.getRight();
-
-          final CalciteConnectionConfig config = query.getConnectionConfig();
-          for (AggregateCall aggregateCall : aggregate.getAggCallList()) {
-            switch (aggregateCall.getAggregation().getKind()) {
-            case COUNT:
-              // Druid count aggregator can handle 3 scenarios:
-              // 1. count(distinct col) when approximate results
-              //    are acceptable and col is not a metric.
-              //    Note that exact count(distinct column) is handled
-              //    by being rewritten into group by followed by count
-              // 2. count(*)
-              // 3. count(column)
-
-              if 
(checkAggregateOnMetric(ImmutableBitSet.of(aggregateCall.getArgList()),
-                      node, query)) {
-                return true;
-              }
-              // case count(*)
-              if (aggregateCall.getArgList().isEmpty()) {
-                continue;
-              }
-              // case count(column)
-              if (aggregateCall.getArgList().size() == 1 && 
!aggregateCall.isDistinct()) {
-                continue;
-              }
-              // case count(distinct and is approximate)
-              if (aggregateCall.isDistinct()
-                      && (aggregateCall.isApproximate() || 
config.approximateDistinctCount())) {
-                continue;
-              }
-              return true;
-            case SUM:
-            case SUM0:
-            case MIN:
-            case MAX:
-              final RelDataType type = aggregateCall.getType();
-              final SqlTypeName sqlTypeName = type.getSqlTypeName();
-              if 
(SqlTypeFamily.APPROXIMATE_NUMERIC.getTypeNames().contains(sqlTypeName)
-                      || 
SqlTypeFamily.INTEGER.getTypeNames().contains(sqlTypeName)) {
-                continue;
-              } else if 
(SqlTypeFamily.EXACT_NUMERIC.getTypeNames().contains(sqlTypeName)) {
-                // Decimal
-                assert sqlTypeName == SqlTypeName.DECIMAL;
-                if (type.getScale() == 0 || config.approximateDecimal()) {
-                  // If scale is zero or we allow approximating decimal, we 
can proceed
-                  continue;
-                }
-              }
-              // Cannot handle this aggregate function
-              return true;
-            default:
-              // Cannot handle this aggregate function
-              return true;
-            }
-          }
-          return false;
-        }
-      };
+          SORT_PROJECT_TRANSPOSE,
+          DRUID_HAVING_FILTER_RULE);
 
   /**
    * Rule to push a {@link org.apache.calcite.rel.core.Filter} into a {@link 
DruidQuery}.
@@ -231,7 +166,9 @@ public class DruidRules {
           new RexSimplify(rexBuilder, predicates, true, executor);
       final RexNode cond = simplify.simplify(filter.getCondition());
       for (RexNode e : RelOptUtil.conjunctions(cond)) {
-        if (query.isValidFilter(e)) {
+        DruidJsonFilter druidJsonFilter = DruidJsonFilter
+            .toDruidFilters(e, filter.getInput().getRowType(), query);
+        if (druidJsonFilter != null) {
           validPreds.add(e);
         } else {
           nonValidPreds.add(e);
@@ -239,19 +176,17 @@ public class DruidRules {
       }
 
       // Timestamp
-      int timestampFieldIdx = -1;
-      for (int i = 0; i < query.getRowType().getFieldCount(); i++) {
-        if (query.druidTable.timestampFieldName.equals(
-                query.getRowType().getFieldList().get(i).getName())) {
-          timestampFieldIdx = i;
-          break;
-        }
-      }
-
+      int timestampFieldIdx = Iterables
+          .indexOf(query.getRowType().getFieldList(), new 
Predicate<RelDataTypeField>() {
+            @Override public boolean apply(@Nullable RelDataTypeField input) {
+              return 
query.druidTable.timestampFieldName.equals(input.getName());
+            }
+          });
+      RelNode newDruidQuery = query;
       final Triple<List<RexNode>, List<RexNode>, List<RexNode>> triple =
           splitFilters(rexBuilder, query, validPreds, nonValidPreds, 
timestampFieldIdx);
       if (triple.getLeft().isEmpty() && triple.getMiddle().isEmpty()) {
-        // We can't push anything useful to Druid.
+        //it sucks, nothing to push
         return;
       }
       final List<RexNode> residualPreds = new ArrayList<>(triple.getRight());
@@ -262,13 +197,14 @@ public class DruidRules {
         assert timeZone != null;
         intervals = DruidDateTimeUtils.createInterval(
             RexUtil.composeConjunction(rexBuilder, triple.getLeft(), false),
-            timeZone);
+
+            query.getConnectionConfig().timeZone());
         if (intervals == null || intervals.isEmpty()) {
-          // Case we have an filter with extract that can not be written as 
interval push down
+          // Case we have a filter with extract that can not be written as 
interval push down
           triple.getMiddle().addAll(triple.getLeft());
         }
       }
-      RelNode newDruidQuery = query;
+
       if (!triple.getMiddle().isEmpty()) {
         final RelNode newFilter = filter.copy(filter.getTraitSet(), 
Util.last(query.rels),
             RexUtil.composeConjunction(rexBuilder, triple.getMiddle(), false));
@@ -304,13 +240,9 @@ public class DruidRules {
       for (RexNode conj : validPreds) {
         final RelOptUtil.InputReferencedVisitor visitor = new 
RelOptUtil.InputReferencedVisitor();
         conj.accept(visitor);
-        if (visitor.inputPosReferenced.contains(timestampFieldIdx)) {
-          if (visitor.inputPosReferenced.size() != 1) {
-            // Complex predicate, transformation currently not supported
-            nonPushableNodes.add(conj);
-          } else {
-            timeRangeNodes.add(conj);
-          }
+        if (visitor.inputPosReferenced.contains(timestampFieldIdx)
+            && visitor.inputPosReferenced.size() == 1) {
+          timeRangeNodes.add(conj);
         } else {
           pushableNodes.add(conj);
         }
@@ -320,6 +252,36 @@ public class DruidRules {
   }
 
   /**
+   * Rule to Push a Having {@link Filter} into a {@link DruidQuery}
+   */
+  public static class DruidHavingFilterRule extends RelOptRule {
+
+    public DruidHavingFilterRule(RelBuilderFactory relBuilderFactory) {
+      super(operand(Filter.class, operand(DruidQuery.class, none())),
+          relBuilderFactory, null);
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Filter filter = call.rel(0);
+      final DruidQuery query = call.rel(1);
+
+      if (!DruidQuery.isValidSignature(query.signature() + 'h')) {
+        return;
+      }
+
+      final RexNode cond = filter.getCondition();
+      final DruidJsonFilter druidJsonFilter = DruidJsonFilter
+          .toDruidFilters(cond, query.getTopNode().getRowType(), query);
+      if (druidJsonFilter != null) {
+        final RelNode newFilter = filter
+            .copy(filter.getTraitSet(), Util.last(query.rels), 
filter.getCondition());
+        final DruidQuery newDruidQuery = DruidQuery.extendQuery(query, 
newFilter);
+        call.transformTo(newDruidQuery);
+      }
+    }
+  }
+
+  /**
    * Rule to push a {@link org.apache.calcite.rel.core.Project} into a {@link 
DruidQuery}.
    */
   public static class DruidProjectRule extends RelOptRule {
@@ -343,14 +305,16 @@ public class DruidRules {
         return;
       }
 
-      if (canProjectAll(project.getProjects())) {
+      if (DruidQuery.computeProjectAsScan(project, 
query.getTable().getRowType(), query)
+          != null) {
         // All expressions can be pushed to Druid in their entirety.
         final RelNode newProject = project.copy(project.getTraitSet(),
-                ImmutableList.of(Util.last(query.rels)));
+            ImmutableList.of(Util.last(query.rels)));
         RelNode newNode = DruidQuery.extendQuery(query, newProject);
         call.transformTo(newNode);
         return;
       }
+
       final Pair<List<RexNode>, List<RexNode>> pair =
           splitProjects(rexBuilder, query, project.getProjects());
       if (pair == null) {
@@ -378,15 +342,6 @@ public class DruidRules {
       call.transformTo(newProject2);
     }
 
-    private static boolean canProjectAll(List<RexNode> nodes) {
-      for (RexNode e : nodes) {
-        if (!(e instanceof RexInputRef)) {
-          return false;
-        }
-      }
-      return true;
-    }
-
     private static Pair<List<RexNode>, List<RexNode>> splitProjects(final 
RexBuilder rexBuilder,
             final RelNode input, List<RexNode> nodes) {
       final RelOptUtil.InputReferencedVisitor visitor = new 
RelOptUtil.InputReferencedVisitor();
@@ -442,183 +397,37 @@ public class DruidRules {
     public void onMatch(RelOptRuleCall call) {
       Project project = call.rel(0);
       DruidQuery query = call.rel(1);
-      final RelOptCluster cluster = project.getCluster();
-      final RexBuilder rexBuilder = cluster.getRexBuilder();
       if (!DruidQuery.isValidSignature(query.signature() + 'o')) {
         return;
       }
-      Pair<ImmutableMap<String, String>, Boolean> scanned = scanProject(query, 
project);
-      // Only try to push down Project when there will be Post aggregators in 
result DruidQuery
-      if (scanned.right) {
-        Pair<Project, Project> splitProjectAggregate = 
splitProject(rexBuilder, query,
-                project, scanned.left, cluster);
-        Project inner = splitProjectAggregate.left;
-        Project outer = splitProjectAggregate.right;
-        DruidQuery newQuery = DruidQuery.extendQuery(query, inner);
-        // When all project get pushed into DruidQuery, the project can be 
replaced by DruidQuery.
-        if (outer != null) {
-          Project newProject = outer.copy(outer.getTraitSet(), newQuery, 
outer.getProjects(),
-              outer.getRowType());
-          call.transformTo(newProject);
-        } else {
-          call.transformTo(newQuery);
+      boolean hasRexCalls = false;
+      for (RexNode rexNode : project.getChildExps()) {
+        if (rexNode instanceof RexCall) {
+          hasRexCalls = true;
+          break;
         }
       }
-    }
+      // Only try to push down Project when there will be Post aggregators in 
result DruidQuery
+      if (hasRexCalls) {
 
-    /**
-     * Similar to split Project in DruidProjectRule. It used the name mapping 
from scanProject
-     * to render the correct field names of inner project so that the outer 
project can correctly
-     * refer to them. For RexNode that can be parsed into post aggregator, 
they will get pushed in
-     * before input reference, then outer project can simply refer to those 
pushed in RexNode to
-     * get result.
-     * @param rexBuilder builder from cluster
-     * @param query matched Druid Query
-     * @param project matched project takes in druid
-     * @param nameMap Result nameMapping from scanProject
-     * @param cluster cluster that provide builder for row type.
-     * @return Triple object contains inner project, outer project and required
-     *         Json Post Aggregation objects to be pushed down into Druid 
Query.
-     */
-    public Pair<Project, Project> splitProject(final RexBuilder rexBuilder,
-        DruidQuery query, Project project, ImmutableMap<String, String> 
nameMap,
-        final RelOptCluster cluster) {
-      //Visit & Build Inner Project
-      final List<RexNode> innerRex = new ArrayList<>();
-      final RelDataTypeFactory.Builder typeBuilder =
-          cluster.getTypeFactory().builder();
-      final RelOptUtil.InputReferencedVisitor visitor =
-          new RelOptUtil.InputReferencedVisitor();
-      final List<Integer> positions = new ArrayList<>();
-      final List<RelDataType> innerTypes = new ArrayList<>();
-      // Similar logic to splitProject in DruidProject Rule
-      // However, post aggregation will also be output of DruidQuery and they 
will be
-      // added before other input.
-      int offset = 0;
-      for (Pair<RexNode, String> pair : project.getNamedProjects()) {
-        RexNode rex = pair.left;
-        String name = pair.right;
-        String fieldName = nameMap.get(name);
-        if (fieldName == null) {
-          rex.accept(visitor);
+        final RelNode topNode = query.getTopNode();
+        final Aggregate topAgg;
+        if (topNode instanceof Aggregate) {
+          topAgg = (Aggregate) topNode;
         } else {
-          final RexNode node = rexBuilder.copy(rex);
-          innerRex.add(node);
-          positions.add(offset++);
-          typeBuilder.add(nameMap.get(name), node.getType());
-          innerTypes.add(node.getType());
+          topAgg = (Aggregate) ((Filter) topNode).getInput();
         }
-      }
-      // Other referred input will be added into the inner project rex list.
-      positions.addAll(visitor.inputPosReferenced);
-      for (int i : visitor.inputPosReferenced) {
-        final RexNode node = rexBuilder.makeInputRef(Util.last(query.rels), i);
-        innerRex.add(node);
-        typeBuilder.add(query.getRowType().getFieldNames().get(i), 
node.getType());
-        innerTypes.add(node.getType());
-      }
-      Project innerProject = project.copy(project.getTraitSet(), 
Util.last(query.rels), innerRex,
-          typeBuilder.build());
-      // If the whole project is pushed, we do not need to do anything else.
-      if (project.getNamedProjects().size() == nameMap.size()) {
-        return new Pair<>(innerProject, null);
-      }
-      // Build outer Project when some projects are left in outer project.
-      offset = 0;
-      final List<RexNode> outerRex = new ArrayList<>();
-      for (Pair<RexNode, String> pair : project.getNamedProjects()) {
-        RexNode rex = pair.left;
-        String name = pair.right;
-        if (!nameMap.containsKey(name)) {
-          outerRex.add(
-              rex.accept(
-                  new RexShuttle() {
-                    @Override public RexNode visitInputRef(RexInputRef ref) {
-                      final int j = positions.indexOf(ref.getIndex());
-                      return rexBuilder.makeInputRef(innerTypes.get(j), j);
-                    }
-                  }));
-        } else {
-          outerRex.add(
-              rexBuilder.makeInputRef(rex.getType(),
-                  positions.indexOf(offset++)));
-        }
-      }
-      Project outerProject = project.copy(project.getTraitSet(), innerProject,
-          outerRex, project.getRowType());
-      return new Pair<>(innerProject, outerProject);
-    }
-
-    /**
-     * Scans the project.
-     *
-     * <p>Takes Druid Query as input to figure out which expression can be
-     * pushed down. Also returns a map to show the correct field name in Druid
-     * Query for columns get pushed in.
-     *
-     * @param query matched Druid Query
-     * @param project Matched project that takes in Druid Query
-     * @return Pair that shows how name map with each other.
-     */
-    public Pair<ImmutableMap<String, String>, Boolean> scanProject(
-        DruidQuery query, Project project) {
-      List<String> aggNamesWithGroup = query.getRowType().getFieldNames();
-      final ImmutableMap.Builder<String, String> mapBuilder = 
ImmutableMap.builder();
-      int j = 0;
-      boolean ret = false;
-      for (Pair<RexNode, String> namedProject : project.getNamedProjects()) {
-        RexNode rex = namedProject.left;
-        String name = namedProject.right;
-        // Find out the corresponding fieldName for DruidQuery to fetch result
-        // in DruidConnectionImpl, give specific name for post aggregator
-        if (rex instanceof RexCall) {
-          if (checkPostAggregatorExist(rex)) {
-            String postAggName = "postagg#" + j++;
-            mapBuilder.put(name, postAggName);
-            ret = true;
-          }
-        } else if (rex instanceof RexInputRef) {
-          String fieldName = aggNamesWithGroup.get(((RexInputRef) 
rex).getIndex());
-          mapBuilder.put(name, fieldName);
-        }
-      }
-      return new Pair<>(mapBuilder.build(), ret);
-    }
 
-    /**
-     * Recursively check whether the rexNode can be parsed into post 
aggregator in druid query
-     * Have to fulfill conditions below:
-     * 1. Arithmetic operation +, -, /, * or CAST in SQL
-     * 2. Simple input reference refer to the result of Aggregate or Grouping
-     * 3. A constant
-     * 4. All input referred should also be able to be parsed
-     * @param rexNode input RexNode to be recursively checked
-     * @return a boolean shows whether this rexNode can be parsed or not.
-     */
-    public boolean checkPostAggregatorExist(RexNode rexNode) {
-      if (rexNode instanceof RexCall) {
-        for (RexNode ele : ((RexCall) rexNode).getOperands()) {
-          boolean inputRex = checkPostAggregatorExist(ele);
-          if (!inputRex) {
-            return false;
+        for (RexNode rexNode : project.getProjects()) {
+          if (DruidExpressions.toDruidExpression(rexNode, topAgg.getRowType(), 
query) == null) {
+            return;
           }
         }
-        switch (rexNode.getKind()) {
-        case PLUS:
-        case MINUS:
-        case DIVIDE:
-        case TIMES:
-        //case CAST:
-          return true;
-        default:
-          return false;
-        }
-      } else if (rexNode instanceof RexInputRef || rexNode instanceof 
RexLiteral) {
-        // Do not have to check the source of input because the signature 
checking ensure
-        // the input of project must be Aggregate.
-        return true;
+        final RelNode newProject = project
+            .copy(project.getTraitSet(), 
ImmutableList.of(Util.last(query.rels)));
+        final DruidQuery newQuery = DruidQuery.extendQuery(query, newProject);
+        call.transformTo(newQuery);
       }
-      return false;
     }
   }
 
@@ -640,28 +449,30 @@ public class DruidRules {
     public void onMatch(RelOptRuleCall call) {
       final Aggregate aggregate = call.rel(0);
       final DruidQuery query = call.rel(1);
+      final RelNode topDruidNode = query.getTopNode();
+      final Project project = topDruidNode instanceof Project ? (Project) 
topDruidNode : null;
       if (!DruidQuery.isValidSignature(query.signature() + 'a')) {
         return;
       }
 
       if (aggregate.indicator
-              || aggregate.getGroupSets().size() != 1
-              || BAD_AGG.apply(ImmutableTriple.of(aggregate, (RelNode) 
aggregate, query))
-              || !validAggregate(aggregate, query)) {
+          || aggregate.getGroupSets().size() != 1) {
         return;
       }
-      final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(),
-              ImmutableList.of(Util.last(query.rels)));
-      call.transformTo(DruidQuery.extendQuery(query, newAggregate));
-    }
-
-    /* Check whether agg functions reference timestamp */
-    private static boolean validAggregate(Aggregate aggregate, DruidQuery 
query) {
-      ImmutableBitSet.Builder builder = ImmutableBitSet.builder();
-      for (AggregateCall aggCall : aggregate.getAggCallList()) {
-        builder.addAll(aggCall.getArgList());
+      if (DruidQuery
+          .computeProjectGroupSet(project, aggregate.getGroupSet(), 
query.table.getRowType(), query)
+          == null) {
+        return;
+      }
+      final List<String> aggNames = Util
+          .skip(aggregate.getRowType().getFieldNames(), 
aggregate.getGroupSet().cardinality());
+      if (DruidQuery.computeDruidJsonAgg(aggregate.getAggCallList(), aggNames, 
project, query)
+          == null) {
+        return;
       }
-      return !checkTimestampRefOnQuery(builder.build(), query.getTopNode(), 
query);
+      final RelNode newAggregate = aggregate
+          .copy(aggregate.getTraitSet(), ImmutableList.of(query.getTopNode()));
+      call.transformTo(DruidQuery.extendQuery(query, newAggregate));
     }
   }
 
@@ -691,34 +502,26 @@ public class DruidRules {
       if (!DruidQuery.isValidSignature(query.signature() + 'p' + 'a')) {
         return;
       }
-
-      int timestampIdx = validProject(project, query);
-      List<Integer> filterRefs = getFilterRefs(aggregate.getAggCallList());
-
-      if (timestampIdx == -1 && filterRefs.size() == 0) {
+      if (aggregate.indicator
+          || aggregate.getGroupSets().size() != 1) {
         return;
       }
-
-      // Check that the filters that the Aggregate calls refer to are valid 
filters can be pushed
-      // into Druid
-      for (Integer i : filterRefs) {
-        RexNode filterNode = project.getProjects().get(i);
-        if (!query.isValidFilter(filterNode) || filterNode.isAlwaysFalse()) {
-          return;
-        }
+      if (DruidQuery
+          .computeProjectGroupSet(project, aggregate.getGroupSet(), 
query.table.getRowType(), query)
+          == null) {
+        return;
       }
-
-      if (aggregate.indicator
-              || aggregate.getGroupSets().size() != 1
-              || BAD_AGG.apply(ImmutableTriple.of(aggregate, (RelNode) 
project, query))
-              || !validAggregate(aggregate, timestampIdx, filterRefs.size())) {
+      final List<String> aggNames = Util
+          .skip(aggregate.getRowType().getFieldNames(), 
aggregate.getGroupSet().cardinality());
+      if (DruidQuery.computeDruidJsonAgg(aggregate.getAggCallList(), aggNames, 
project, query)
+          == null) {
         return;
       }
       final RelNode newProject = project.copy(project.getTraitSet(),
               ImmutableList.of(Util.last(query.rels)));
       final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(),
               ImmutableList.of(newProject));
-
+      List<Integer> filterRefs = getFilterRefs(aggregate.getAggCallList());
       final DruidQuery query2;
       if (filterRefs.size() > 0) {
         query2 = optimizeFilteredAggregations(call, query, (Project) 
newProject,
@@ -912,81 +715,6 @@ public class DruidRules {
       return refs;
     }
 
-    /* To be a valid Project, we allow it to contain references, and a single 
call
-     * to a FLOOR function on the timestamp column OR valid time EXTRACT on 
the timestamp column.
-     * Returns the reference to the timestamp, if any. */
-    private static int validProject(Project project, DruidQuery query) {
-      List<RexNode> nodes = project.getProjects();
-      int idxTimestamp = -1;
-      boolean hasFloor = false;
-      for (int i = 0; i < nodes.size(); i++) {
-        final RexNode e = nodes.get(i);
-        if (e instanceof RexCall) {
-          // It is a call, check that it is EXTRACT and follow-up conditions
-          final RexCall call = (RexCall) e;
-          final String timeZone = query.getCluster().getPlanner().getContext()
-              .unwrap(CalciteConnectionConfig.class).timeZone();
-          assert timeZone != null;
-          if (DruidDateTimeUtils.extractGranularity(call, timeZone) == null) {
-            return -1;
-          }
-          if (idxTimestamp != -1 && hasFloor) {
-            // Already one usage of timestamp column
-            return -1;
-          }
-          switch (call.getKind()) {
-          case FLOOR:
-            hasFloor = true;
-            if (!(call.getOperands().get(0) instanceof RexInputRef)) {
-              return -1;
-            }
-            final RexInputRef ref = (RexInputRef) call.getOperands().get(0);
-            if (!(checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()),
-                query.getTopNode(),
-                query))) {
-              return -1;
-            }
-            idxTimestamp = i;
-            break;
-          case EXTRACT:
-            idxTimestamp = RelOptUtil.InputFinder.bits(call).asList().get(0);
-            break;
-          default:
-            throw new AssertionError();
-          }
-          continue;
-        }
-        if (!(e instanceof RexInputRef)) {
-          // It needs to be a reference
-          return -1;
-        }
-        final RexInputRef ref = (RexInputRef) e;
-        if (checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()),
-                query.getTopNode(), query)) {
-          if (idxTimestamp != -1) {
-            // Already one usage of timestamp column
-            return -1;
-          }
-          idxTimestamp = i;
-        }
-      }
-      return idxTimestamp;
-    }
-
-    private static boolean validAggregate(Aggregate aggregate, int idx, int 
numFilterRefs) {
-      if (numFilterRefs > 0 && idx < 0) {
-        return true;
-      }
-      if (!aggregate.getGroupSet().get(idx)) {
-        return false;
-      }
-      for (AggregateCall aggCall : aggregate.getAggCallList()) {
-        if (aggCall.getArgList().contains(idx)) {
-          return false;
-        }
-      }
-      return true;
-    }
   }
 
   /**
@@ -1054,146 +782,20 @@ public class DruidRules {
         return;
       }
       // Either it is:
-      // - a sort and limit on a dimension/metric part of the druid group by 
query or
-      // - a sort without limit on the time column on top of
-      //     Agg operator (transformable to timeseries query), or
-      // - a simple limit on top of other operator than Agg
-      if (!validSortLimit(sort, query)) {
-        return;
-      }
-      final RelNode newSort = sort.copy(sort.getTraitSet(),
-              ImmutableList.of(Util.last(query.rels)));
-      call.transformTo(DruidQuery.extendQuery(query, newSort));
-    }
-
-    /** Checks whether sort is valid. */
-    private static boolean validSortLimit(Sort sort, DruidQuery query) {
+      // - a pure limit above a query of type scan
+      // - a sort and limit on a dimension/metric part of the druid group by 
query
       if (sort.offset != null && RexLiteral.intValue(sort.offset) != 0) {
         // offset not supported by Druid
-        return false;
-      }
-      // Use a different logic to push down Sort RelNode because the top node 
could be a Project now
-      RelNode topNode = query.getTopNode();
-      Aggregate topAgg;
-      if (topNode instanceof Project && ((Project) topNode).getInput() 
instanceof Aggregate) {
-        topAgg = (Aggregate) ((Project) topNode).getInput();
-      } else if (topNode instanceof Aggregate) {
-        topAgg = (Aggregate) topNode;
-      } else {
-        // If it is going to be a Druid select operator, we push the limit if
-        // it does not contain a sort specification (required by Druid)
-        return RelOptUtil.isPureLimit(sort);
-      }
-      final ImmutableBitSet.Builder positionsReferenced = 
ImmutableBitSet.builder();
-      for (RelFieldCollation col : sort.collation.getFieldCollations()) {
-        int idx = col.getFieldIndex();
-        if (idx >= topAgg.getGroupCount()) {
-          continue;
-        }
-        //has the indexes of the columns used for sorts
-        positionsReferenced.set(topAgg.getGroupSet().nth(idx));
-      }
-      // Case it is a timeseries query
-      if (checkIsFlooringTimestampRefOnQuery(topAgg.getGroupSet(), 
topAgg.getInput(), query)
-          && topAgg.getGroupCount() == 1) {
-        // do not push if it has a limit or more than one sort key or we have 
sort by
-        // metric/dimension
-        return !RelOptUtil.isLimit(sort) && 
sort.collation.getFieldCollations().size() == 1
-            && checkTimestampRefOnQuery(positionsReferenced.build(), 
topAgg.getInput(), query);
-      }
-      return true;
-    }
-  }
-
-  /** Returns true if any of the grouping key is a floor operator over the 
timestamp column. */
-  private static boolean checkIsFlooringTimestampRefOnQuery(ImmutableBitSet 
set, RelNode top,
-      DruidQuery query) {
-    if (top instanceof Project) {
-      ImmutableBitSet.Builder newSet = ImmutableBitSet.builder();
-      final Project project = (Project) top;
-      for (int index : set) {
-        RexNode node = project.getProjects().get(index);
-        if (node instanceof RexCall) {
-          RexCall call = (RexCall) node;
-          final String timeZone = query.getCluster().getPlanner().getContext()
-              .unwrap(CalciteConnectionConfig.class).timeZone();
-          assert timeZone != null;
-          assert DruidDateTimeUtils.extractGranularity(call, timeZone) != null;
-          if (call.getKind() == SqlKind.FLOOR) {
-            newSet.addAll(RelOptUtil.InputFinder.bits(call));
-          }
-        }
-      }
-      top = project.getInput();
-      set = newSet.build();
-    }
-    // Check if any references the timestamp column
-    for (int index : set) {
-      if (query.druidTable.timestampFieldName.equals(
-          top.getRowType().getFieldNames().get(index))) {
-        return true;
-      }
-    }
-
-    return false;
-  }
-
-  /** Checks whether any of the references leads to the timestamp column. */
-  private static boolean checkTimestampRefOnQuery(ImmutableBitSet set, RelNode 
top,
-      DruidQuery query) {
-    if (top instanceof Project) {
-      ImmutableBitSet.Builder newSet = ImmutableBitSet.builder();
-      final Project project = (Project) top;
-      for (int index : set) {
-        RexNode node = project.getProjects().get(index);
-        if (node instanceof RexInputRef) {
-          newSet.set(((RexInputRef) node).getIndex());
-        } else if (node instanceof RexCall) {
-          RexCall call = (RexCall) node;
-          final String timeZone = query.getCluster().getPlanner().getContext()
-              .unwrap(CalciteConnectionConfig.class).timeZone();
-          assert timeZone != null;
-          assert DruidDateTimeUtils.extractGranularity(call, timeZone) != null;
-          // when we have extract from time column the rexCall is of the form
-          // "/Reinterpret$0"
-          newSet.addAll(RelOptUtil.InputFinder.bits(call));
-        }
+        return;
       }
-      top = project.getInput();
-      set = newSet.build();
-    }
-
-    // Check if any references the timestamp column
-    for (int index : set) {
-      if (query.druidTable.timestampFieldName.equals(
-              top.getRowType().getFieldNames().get(index))) {
-        return true;
+      if (query.getQueryType() == QueryType.SCAN && 
!RelOptUtil.isPureLimit(sort)) {
+        return;
       }
-    }
 
-    return false;
-  }
-
-  /** Checks whether any of the references leads to a metric column. */
-  private static boolean checkAggregateOnMetric(ImmutableBitSet set, RelNode 
topProject,
-      DruidQuery query) {
-    if (topProject instanceof Project) {
-      ImmutableBitSet.Builder newSet = ImmutableBitSet.builder();
-      final Project project = (Project) topProject;
-      for (int index : set) {
-        RexNode node = project.getProjects().get(index);
-        ImmutableBitSet setOfBits = RelOptUtil.InputFinder.bits(node);
-        newSet.addAll(setOfBits);
-      }
-      set = newSet.build();
-    }
-    for (int index : set) {
-      if 
(query.druidTable.isMetric(query.getTopNode().getRowType().getFieldNames().get(index)))
 {
-        return true;
-      }
+      final RelNode newSort = sort
+          .copy(sort.getTraitSet(), ImmutableList.of(Util.last(query.rels)));
+      call.transformTo(DruidQuery.extendQuery(query, newSort));
     }
-
-    return false;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlCastConverter.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlCastConverter.java
 
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlCastConverter.java
new file mode 100644
index 0000000..0731a6f
--- /dev/null
+++ 
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlCastConverter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Period;
+
+import java.util.TimeZone;
+
+/**
+ * Druid cast converter operator used to translates calcite casts to Druid 
expression cast
+ */
+public class DruidSqlCastConverter implements DruidSqlOperatorConverter {
+
+  @Override public SqlOperator calciteOperator() {
+    return SqlStdOperatorTable.CAST;
+  }
+
+  @Override public String toDruidExpression(RexNode rexNode, RelDataType 
topRel,
+      DruidQuery druidQuery) {
+
+    final RexNode operand = ((RexCall) rexNode).getOperands().get(0);
+    final String operandExpression = 
DruidExpressions.toDruidExpression(operand,
+        topRel, druidQuery);
+
+    if (operandExpression == null) {
+      return null;
+    }
+
+    final SqlTypeName fromType = operand.getType().getSqlTypeName();
+    final SqlTypeName toType = rexNode.getType().getSqlTypeName();
+    final String timeZoneConf = druidQuery.getConnectionConfig().timeZone();
+    final TimeZone timeZone = TimeZone.getTimeZone(timeZoneConf == null ? 
"UTC" : timeZoneConf);
+
+    if (SqlTypeName.CHAR_TYPES.contains(fromType) && 
SqlTypeName.DATETIME_TYPES.contains(toType)) {
+      //case chars to dates
+      return castCharToDateTime(timeZone, operandExpression,
+          toType);
+    } else if (SqlTypeName.DATETIME_TYPES.contains(fromType) && 
SqlTypeName.CHAR_TYPES.contains
+        (toType)) {
+      //case dates to chars
+      return castDateTimeToChar(timeZone, operandExpression,
+          fromType);
+    } else {
+      // Handle other casts.
+      final DruidType fromExprType = 
DruidExpressions.EXPRESSION_TYPES.get(fromType);
+      final DruidType toExprType = 
DruidExpressions.EXPRESSION_TYPES.get(toType);
+
+      if (fromExprType == null || toExprType == null) {
+        // Unknown types bail out.
+        return null;
+      }
+      final String typeCastExpression;
+      if (fromExprType != toExprType) {
+        typeCastExpression = DruidQuery.format("CAST(%s, '%s')", 
operandExpression,
+            toExprType
+            .toString());
+      } else {
+        // case it is the same type it is ok to skip CAST
+        typeCastExpression = operandExpression;
+      }
+
+      if (toType == SqlTypeName.DATE) {
+        // Floor to day when casting to DATE.
+        return DruidExpressions.applyTimestampFloor(
+            typeCastExpression,
+            Period.days(1).toString(),
+            "",
+            TimeZone.getTimeZone(druidQuery.getConnectionConfig().timeZone()));
+      } else {
+        return typeCastExpression;
+      }
+
+    }
+  }
+
+  private static String castCharToDateTime(
+      TimeZone timeZone,
+      String operand,
+      final SqlTypeName toType) {
+    // Cast strings to date times by parsing them from SQL format.
+    final String timestampExpression = DruidExpressions.functionCall(
+        "timestamp_parse",
+        ImmutableList.of(
+            operand,
+            DruidExpressions.stringLiteral(""),
+            DruidExpressions.stringLiteral(timeZone.getID())));
+
+    if (toType == SqlTypeName.DATE) {
+      // case to date we need to floor to day first
+      return DruidExpressions.applyTimestampFloor(
+          timestampExpression,
+          Period.days(1).toString(),
+          "",
+          timeZone);
+    } else if (toType == SqlTypeName.TIMESTAMP || toType == SqlTypeName
+        .TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+      return timestampExpression;
+    } else {
+      throw new IllegalStateException(
+          DruidQuery.format("Unsupported DateTime type[%s]", toType));
+    }
+  }
+
+  private static String castDateTimeToChar(
+      final TimeZone timeZone,
+      final String operand,
+      final SqlTypeName fromType) {
+    return DruidExpressions.functionCall(
+        "timestamp_format",
+        ImmutableList.of(
+            operand,
+            DruidExpressions.stringLiteral(dateTimeFormatString(fromType)),
+            DruidExpressions.stringLiteral(timeZone.getID())));
+  }
+
+  public static String dateTimeFormatString(final SqlTypeName sqlTypeName) {
+    if (sqlTypeName == SqlTypeName.DATE) {
+      return "yyyy-MM-dd";
+    } else if (sqlTypeName == SqlTypeName.TIMESTAMP) {
+      return "yyyy-MM-dd HH:mm:ss";
+    } else if (sqlTypeName == sqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+      return "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+    } else {
+      return null;
+    }
+  }
+}
+
+// End DruidSqlCastConverter.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlOperatorConverter.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlOperatorConverter.java
 
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlOperatorConverter.java
new file mode 100644
index 0000000..0ee179a
--- /dev/null
+++ 
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlOperatorConverter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+
+import javax.annotation.Nullable;
+
+/**
+ * Defines how to convert RexNode with a given calcite SQL operator to Druid 
expressions
+ */
+public interface DruidSqlOperatorConverter {
+
+  /**
+   * Returns the calcite SQL operator corresponding to Druid operator.
+   *
+   * @return operator
+   */
+  SqlOperator calciteOperator();
+
+
+  /**
+   * Translate rexNode to valid Druid expression.
+   * @param rexNode rexNode to translate to Druid expression
+   * @param rowType row type associated with rexNode
+   * @param druidQuery druid query used to figure out configs/fields related 
like timeZone
+   *
+   * @return valid Druid expression or null if it can not convert the rexNode
+   */
+  @Nullable String toDruidExpression(RexNode rexNode, RelDataType rowType, 
DruidQuery druidQuery);
+}
+
+// End DruidSqlOperatorConverter.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/DruidType.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidType.java 
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidType.java
index f50fdfd..ec601b7 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidType.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidType.java
@@ -21,10 +21,10 @@ import org.apache.calcite.sql.type.SqlTypeName;
 /** Druid type. */
 public enum DruidType {
   LONG(SqlTypeName.BIGINT),
-  // SQL DOUBLE and FLOAT types are both 64 bit, but we use DOUBLE because
-  // people find FLOAT confusing.
-  FLOAT(SqlTypeName.DOUBLE),
+  FLOAT(SqlTypeName.FLOAT),
+  DOUBLE(SqlTypeName.DOUBLE),
   STRING(SqlTypeName.VARCHAR),
+  COMPLEX(SqlTypeName.OTHER),
   HYPER_UNIQUE(SqlTypeName.VARBINARY),
   THETA_SKETCH(SqlTypeName.VARBINARY);
 
@@ -39,13 +39,13 @@ public enum DruidType {
    * Returns true if and only if this enum should be used inside of a {@link 
ComplexMetric}
    * */
   public boolean isComplex() {
-    return this == THETA_SKETCH || this == HYPER_UNIQUE;
+    return this == THETA_SKETCH || this == HYPER_UNIQUE || this == COMPLEX;
   }
 
   /**
    * Returns a DruidType matching the given String type from a Druid metric
    * */
-  public static DruidType getTypeFromMetric(String type) {
+  protected static DruidType getTypeFromMetric(String type) {
     assert type != null;
     if (type.equals("hyperUnique")) {
       return HYPER_UNIQUE;
@@ -54,6 +54,8 @@ public enum DruidType {
     } else if (type.startsWith("long") || type.equals("count")) {
       return LONG;
     } else if (type.startsWith("double")) {
+      return DOUBLE;
+    } else if (type.startsWith("float")) {
       return FLOAT;
     }
     throw new AssertionError("Unknown type: " + type);
@@ -62,13 +64,15 @@ public enum DruidType {
   /**
    * Returns a DruidType matching the String from a meta data query
    * */
-  public static DruidType getTypeFromMetaData(String type) {
+  protected static DruidType getTypeFromMetaData(String type) {
     assert type != null;
     switch (type) {
     case "LONG":
       return LONG;
     case "FLOAT":
       return FLOAT;
+    case "DOUBLE":
+      return DOUBLE;
     case "STRING":
       return STRING;
     default:

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractOperatorConversion.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractOperatorConversion.java
 
b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractOperatorConversion.java
new file mode 100644
index 0000000..6e35540
--- /dev/null
+++ 
b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractOperatorConversion.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+import java.util.TimeZone;
+
+/**
+ * Time extract operator conversion for expressions like EXTRACT(timeUnit FROM 
arg)
+ * Unit can be SECOND, MINUTE, HOUR, DAY (day of month),
+ * DOW (day of week), DOY (day of year), WEEK (week of week year),
+ * MONTH (1 through 12), QUARTER (1 through 4), or YEAR
+ **/
+public class ExtractOperatorConversion implements DruidSqlOperatorConverter {
+  private static final Map<TimeUnitRange, String> EXTRACT_UNIT_MAP =
+      ImmutableMap.<TimeUnitRange, String>builder()
+          .put(TimeUnitRange.SECOND, "SECOND")
+          .put(TimeUnitRange.MINUTE, "MINUTE")
+          .put(TimeUnitRange.HOUR, "HOUR")
+          .put(TimeUnitRange.DAY, "DAY")
+          .put(TimeUnitRange.DOW, "DOW")
+          .put(TimeUnitRange.DOY, "DOY")
+          .put(TimeUnitRange.WEEK, "WEEK")
+          .put(TimeUnitRange.MONTH, "MONTH")
+          .put(TimeUnitRange.QUARTER, "QUARTER")
+          .put(TimeUnitRange.YEAR, "YEAR")
+          .build();
+
+  @Override public SqlOperator calciteOperator() {
+    return SqlStdOperatorTable.EXTRACT;
+  }
+
+  @Override public String toDruidExpression(
+      RexNode rexNode, RelDataType rowType, DruidQuery query) {
+
+    final RexCall call = (RexCall) rexNode;
+    final RexLiteral flag = (RexLiteral) call.getOperands().get(0);
+    final TimeUnitRange calciteUnit = (TimeUnitRange) flag.getValue();
+    final RexNode arg = call.getOperands().get(1);
+
+    final String input = DruidExpressions.toDruidExpression(arg, rowType, 
query);
+    if (input == null) {
+      return null;
+    }
+
+    final String druidUnit = EXTRACT_UNIT_MAP.get(calciteUnit);
+    if (druidUnit == null) {
+      return null;
+    }
+
+    return DruidExpressions.applyTimeExtract(
+        input, druidUnit, 
TimeZone.getTimeZone(query.getConnectionConfig().timeZone()));
+  }
+}
+
+// End ExtractOperatorConversion.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java
 
b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java
index 601fc89..0aece36 100644
--- 
a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java
+++ 
b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java
@@ -21,6 +21,8 @@ import com.google.common.base.Preconditions;
 
 import java.io.IOException;
 
+import javax.annotation.Nullable;
+
 import static org.apache.calcite.adapter.druid.DruidQuery.writeField;
 import static org.apache.calcite.adapter.druid.DruidQuery.writeFieldIf;
 
@@ -34,18 +36,37 @@ public class ExtractionDimensionSpec implements 
DimensionSpec {
   private final String dimension;
   private final ExtractionFunction extractionFunction;
   private final String outputName;
+  private final DruidType outputType;
 
   public ExtractionDimensionSpec(String dimension, ExtractionFunction 
extractionFunction,
       String outputName) {
+    this(dimension, extractionFunction, outputName, DruidType.STRING);
+  }
+
+  public ExtractionDimensionSpec(String dimension, ExtractionFunction 
extractionFunction,
+      String outputName, DruidType outputType) {
     this.dimension = Preconditions.checkNotNull(dimension);
     this.extractionFunction = Preconditions.checkNotNull(extractionFunction);
     this.outputName = outputName;
+    this.outputType = outputType == null ? DruidType.STRING : outputType;
   }
 
-  public String getOutputName() {
+  @Override public String getOutputName() {
     return outputName;
   }
 
+  @Override public DruidType getOutputType() {
+    return outputType;
+  }
+
+  @Override public ExtractionFunction getExtractionFn() {
+    return extractionFunction;
+  }
+
+  @Override public String getDimension() {
+    return dimension;
+  }
+
   @Override public void write(JsonGenerator generator) throws IOException {
     generator.writeStartObject();
     generator.writeStringField("type", "extraction");
@@ -55,6 +76,33 @@ public class ExtractionDimensionSpec implements 
DimensionSpec {
     generator.writeEndObject();
   }
 
+  /**
+   * @param dimensionSpec Druid Dimesion spec object
+   *
+   * @return valid {@link Granularity} of floor extract or null when not 
possible.
+   */
+  @Nullable
+  public static Granularity toQueryGranularity(DimensionSpec dimensionSpec) {
+    if 
(!DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(dimensionSpec.getDimension())) {
+      // Only __time column can be substituted by granularity
+      return null;
+    }
+    final ExtractionFunction extractionFunction = 
dimensionSpec.getExtractionFn();
+    if (extractionFunction == null) {
+      // No Extract thus no Granularity
+      return null;
+    }
+    if (extractionFunction instanceof TimeExtractionFunction) {
+      Granularity granularity = ((TimeExtractionFunction) 
extractionFunction).getGranularity();
+      String format = ((TimeExtractionFunction) 
extractionFunction).getFormat();
+      if (!TimeExtractionFunction.ISO_TIME_FORMAT.equals(format)) {
+        return null;
+      }
+      return granularity;
+    }
+    return null;
+  }
+
 }
 
 // End ExtractionDimensionSpec.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunction.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunction.java 
b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunction.java
index 8143f8c..d572514 100644
--- 
a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunction.java
+++ 
b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunction.java
@@ -21,7 +21,7 @@ package org.apache.calcite.adapter.druid;
  *
  * <p>Extraction functions define the transformation applied to each dimension 
value.
  */
-public interface ExtractionFunction extends DruidQuery.Json {
+public interface ExtractionFunction extends DruidJson {
 }
 
 // End ExtractionFunction.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/FloorOperatorConversion.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/FloorOperatorConversion.java
 
b/druid/src/main/java/org/apache/calcite/adapter/druid/FloorOperatorConversion.java
new file mode 100644
index 0000000..0d8ecc1
--- /dev/null
+++ 
b/druid/src/main/java/org/apache/calcite/adapter/druid/FloorOperatorConversion.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+
+import java.util.TimeZone;
+
+import javax.annotation.Nullable;
+
+
+/**
+ * DruidSqlOperatorConverter implementation that handles Floor operations 
conversions
+ */
+public class FloorOperatorConversion implements DruidSqlOperatorConverter {
+  @Override public SqlOperator calciteOperator() {
+    return SqlStdOperatorTable.FLOOR;
+  }
+
+  @Nullable
+  @Override public String toDruidExpression(RexNode rexNode, RelDataType 
rowType,
+      DruidQuery druidQuery) {
+    final RexCall call = (RexCall) rexNode;
+    final RexNode arg = call.getOperands().get(0);
+    final String druidExpression = DruidExpressions.toDruidExpression(
+        arg,
+        rowType,
+        druidQuery);
+    if (druidExpression == null) {
+      return null;
+    } else if (call.getOperands().size() == 1) {
+      // case FLOOR(expr)
+      return  DruidQuery.format("floor(%s)", druidExpression);
+    } else if (call.getOperands().size() == 2) {
+      // FLOOR(expr TO timeUnit)
+      final Granularity granularity = DruidDateTimeUtils
+          .extractGranularity(call, 
druidQuery.getConnectionConfig().timeZone());
+      if (granularity == null) {
+        return null;
+      }
+      String isoPeriodFormat = 
DruidDateTimeUtils.toISOPeriodFormat(granularity.getType());
+      if (isoPeriodFormat == null) {
+        return null;
+      }
+      return DruidExpressions.applyTimestampFloor(
+          druidExpression,
+          isoPeriodFormat,
+          "",
+          TimeZone.getTimeZone(druidQuery.getConnectionConfig().timeZone()));
+    } else {
+      return null;
+    }
+  }
+}
+
+// End FloorOperatorConversion.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/Granularities.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/Granularities.java 
b/druid/src/main/java/org/apache/calcite/adapter/druid/Granularities.java
index df1b291..2015075 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/Granularities.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/Granularities.java
@@ -72,9 +72,7 @@ public class Granularities {
     INSTANCE;
 
     @Override public void write(JsonGenerator generator) throws IOException {
-      generator.writeStartObject();
-      generator.writeStringField("type", "all");
-      generator.writeEndObject();
+      generator.writeObject("all");
     }
 
     @Nonnull public Type getType() {

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/Granularity.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/Granularity.java 
b/druid/src/main/java/org/apache/calcite/adapter/druid/Granularity.java
index ffedec2..f70fd18 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/Granularity.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/Granularity.java
@@ -32,7 +32,7 @@ import javax.annotation.Nonnull;
  *
  * @see Granularities
  */
-public interface Granularity extends DruidQuery.Json {
+public interface Granularity extends DruidJson {
   /** Type of supported periods for granularity. */
   enum Type {
     ALL,

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/NaryOperatorConverter.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/NaryOperatorConverter.java
 
b/druid/src/main/java/org/apache/calcite/adapter/druid/NaryOperatorConverter.java
new file mode 100644
index 0000000..961454b
--- /dev/null
+++ 
b/druid/src/main/java/org/apache/calcite/adapter/druid/NaryOperatorConverter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+/**
+ * Converts Calcite n-ary operators to druid expression eg (arg1 Op arg2 Op 
arg3)
+ */
+public class NaryOperatorConverter implements DruidSqlOperatorConverter {
+  private final SqlOperator operator;
+  private final String druidOperatorName;
+
+  public NaryOperatorConverter(SqlOperator operator, String druidOperatorName) 
{
+    this.operator = Preconditions.checkNotNull(operator);
+    this.druidOperatorName = Preconditions.checkNotNull(druidOperatorName);
+  }
+
+  @Override public SqlOperator calciteOperator() {
+    return operator;
+  }
+
+  @Nullable
+  @Override public String toDruidExpression(RexNode rexNode, RelDataType 
rowType,
+      DruidQuery druidQuery) {
+    final RexCall call = (RexCall) rexNode;
+    final List<String> druidExpressions = DruidExpressions.toDruidExpressions(
+        druidQuery, rowType,
+        call.getOperands());
+    if (druidExpressions == null) {
+      return null;
+    }
+    return DruidExpressions.nAryOperatorCall(druidOperatorName, 
druidExpressions);
+  }
+}
+
+// End NaryOperatorConverter.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/SubstringOperatorConversion.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/SubstringOperatorConversion.java
 
b/druid/src/main/java/org/apache/calcite/adapter/druid/SubstringOperatorConversion.java
new file mode 100644
index 0000000..d2342f3
--- /dev/null
+++ 
b/druid/src/main/java/org/apache/calcite/adapter/druid/SubstringOperatorConversion.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+
+import javax.annotation.Nullable;
+
+/**
+ * Converts Calcite SUBSTRING call to Druid Expression when possible
+ */
+public class SubstringOperatorConversion implements DruidSqlOperatorConverter {
+  @Override public SqlOperator calciteOperator() {
+    return SqlStdOperatorTable.SUBSTRING;
+  }
+
+  @Nullable
+  @Override public String toDruidExpression(RexNode rexNode, RelDataType 
rowType,
+      DruidQuery query) {
+    final RexCall call = (RexCall) rexNode;
+    final String arg = DruidExpressions.toDruidExpression(
+        call.getOperands().get(0), rowType, query);
+    if (arg == null) {
+      return null;
+    }
+
+    final int index = RexLiteral.intValue(call.getOperands().get(1)) - 1;
+    // SQL is 1-indexed, Druid is 0-indexed.
+    final int length;
+    if (call.getOperands().size() > 2) {
+      //case substring from index with length
+      length = RexLiteral.intValue(call.getOperands().get(2));
+    } else {
+      //case substring from index to the end
+      length = -1;
+    }
+    return DruidQuery.format("substring(%s, %s, %s)",
+        arg,
+        DruidExpressions.numberLiteral(index),
+        DruidExpressions.numberLiteral(length));
+  }
+}
+
+// End SubstringOperatorConversion.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
 
b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
deleted file mode 100644
index 7ef19a6..0000000
--- 
a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.adapter.druid;
-
-/**
- * DimensionSpec implementation that uses a time format extraction function.
- */
-public class TimeExtractionDimensionSpec extends ExtractionDimensionSpec {
-
-  public TimeExtractionDimensionSpec(
-      ExtractionFunction extractionFunction, String outputName) {
-    super(DruidTable.DEFAULT_TIMESTAMP_COLUMN, extractionFunction, outputName);
-  }
-
-  /**
-   * Creates a time extraction DimensionSpec that renames the '__time' column
-   * to the given name.
-   *
-   * @param outputName name of the output column
-   *
-   * @return the time extraction DimensionSpec instance
-   */
-  public static TimeExtractionDimensionSpec makeFullTimeExtract(
-      String outputName, String timeZone) {
-    return new TimeExtractionDimensionSpec(
-        TimeExtractionFunction.createDefault(timeZone), outputName);
-  }
-
-  /**
-   * Creates a time extraction DimensionSpec that formats the '__time' column
-   * according to the given granularity and outputs the column with the given
-   * name. See {@link TimeExtractionFunction#VALID_TIME_EXTRACT} for set of 
valid extract
-   *
-   * @param granularity granularity to apply to the column
-   * @param outputName  name of the output column
-   *
-   * @return time field extraction DimensionSpec instance or null if 
granularity
-   * is not supported
-   */
-  public static TimeExtractionDimensionSpec makeTimeExtract(
-      Granularity granularity, String outputName, String timeZone) {
-    return new TimeExtractionDimensionSpec(
-        TimeExtractionFunction.createExtractFromGranularity(granularity, 
timeZone), outputName);
-  }
-
-  /**
-   * Creates floor time extraction dimension spec from Granularity with a 
given output name
-   * @param granularity granularity to apply to the time column
-   * @param outputName name of the output column
-   *
-   * @return floor time extraction DimensionSpec instance.
-   */
-  public static TimeExtractionDimensionSpec makeTimeFloor(Granularity 
granularity,
-      String outputName, String timeZone) {
-    ExtractionFunction fn =
-        TimeExtractionFunction.createFloorFromGranularity(granularity, 
timeZone);
-    return new TimeExtractionDimensionSpec(fn, outputName);
-  }
-}
-
-// End TimeExtractionDimensionSpec.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
 
b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
index 61e72e0..5b0265e 100644
--- 
a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
+++ 
b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
@@ -22,12 +22,18 @@ import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
 
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.sql.type.SqlTypeName;
+
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
 import java.io.IOException;
 import java.util.Locale;
+import java.util.TimeZone;
+
+import javax.annotation.Nullable;
 
 import static org.apache.calcite.adapter.druid.DruidQuery.writeFieldIf;
 
@@ -51,6 +57,15 @@ public class TimeExtractionFunction implements 
ExtractionFunction {
       TimeUnitRange.MINUTE,
       TimeUnitRange.SECOND);
 
+  private static final ImmutableSet<TimeUnitRange> VALID_TIME_FLOOR = 
Sets.immutableEnumSet(
+      TimeUnitRange.YEAR,
+      TimeUnitRange.MONTH,
+      TimeUnitRange.DAY,
+      TimeUnitRange.WEEK,
+      TimeUnitRange.HOUR,
+      TimeUnitRange.MINUTE,
+      TimeUnitRange.SECOND);
+
   public static final String ISO_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
 
   private final String format;
@@ -76,6 +91,14 @@ public class TimeExtractionFunction implements 
ExtractionFunction {
     generator.writeEndObject();
   }
 
+  public String getFormat() {
+    return format;
+  }
+  public Granularity getGranularity() {
+    return granularity;
+  }
+
+
   /**
    * Creates the default time format extraction function.
    *
@@ -94,7 +117,7 @@ public class TimeExtractionFunction implements 
ExtractionFunction {
    */
   public static TimeExtractionFunction createExtractFromGranularity(
       Granularity granularity, String timeZone) {
-    final String local = Locale.ROOT.toLanguageTag();
+    final String local = Locale.US.toLanguageTag();
     switch (granularity.getType()) {
     case DAY:
       return new TimeExtractionFunction("d", null, timeZone, local);
@@ -135,11 +158,12 @@ public class TimeExtractionFunction implements 
ExtractionFunction {
    *
    * @return true if the extract unit is valid
    */
+
   public static boolean isValidTimeExtract(RexNode rexNode) {
-    if (rexNode.getKind() != SqlKind.EXTRACT) {
+    final RexCall call = (RexCall) rexNode;
+    if (call.getKind() != SqlKind.EXTRACT || call.getOperands().size() != 2) {
       return false;
     }
-    final RexCall call = (RexCall) rexNode;
     final RexLiteral flag = (RexLiteral) call.operands.get(0);
     final TimeUnitRange timeUnit = (TimeUnitRange) flag.getValue();
     return timeUnit != null && VALID_TIME_EXTRACT.contains(timeUnit);
@@ -163,7 +187,38 @@ public class TimeExtractionFunction implements 
ExtractionFunction {
     }
     final RexLiteral flag = (RexLiteral) call.operands.get(1);
     final TimeUnitRange timeUnit = (TimeUnitRange) flag.getValue();
-    return timeUnit != null && VALID_TIME_EXTRACT.contains(timeUnit);
+    return timeUnit != null && VALID_TIME_FLOOR.contains(timeUnit);
+  }
+
+  /**
+   * @param rexNode cast RexNode
+   * @param timeZone timezone
+   *
+   * @return Druid Time extraction function or null when can not translate the 
cast.
+   */
+  @Nullable
+  public static TimeExtractionFunction translateCastToTimeExtract(RexNode 
rexNode,
+      TimeZone timeZone) {
+    assert rexNode.getKind() == SqlKind.CAST;
+    final RexCall rexCall = (RexCall) rexNode;
+    final String castFormat = DruidSqlCastConverter
+        .dateTimeFormatString(rexCall.getType().getSqlTypeName());
+    final String timeZoneId = timeZone == null ? null : timeZone.getID();
+    if (castFormat == null) {
+      // unknown format
+      return null;
+    }
+    if (rexCall.getType().getFamily() == SqlTypeFamily.DATE) {
+      return new TimeExtractionFunction(castFormat,
+          Granularities.createGranularity(TimeUnitRange.DAY, timeZoneId), 
timeZoneId,
+          Locale.ENGLISH.toString());
+    }
+    if (rexCall.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP
+        || rexCall.getType().getSqlTypeName() == 
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+      return new TimeExtractionFunction(castFormat, null, timeZoneId, 
Locale.ENGLISH.toString());
+    }
+
+    return null;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/UnaryPrefixOperatorConversion.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/UnaryPrefixOperatorConversion.java
 
b/druid/src/main/java/org/apache/calcite/adapter/druid/UnaryPrefixOperatorConversion.java
new file mode 100644
index 0000000..a8e5da3
--- /dev/null
+++ 
b/druid/src/main/java/org/apache/calcite/adapter/druid/UnaryPrefixOperatorConversion.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+
+import com.google.common.collect.Iterables;
+
+import java.util.List;
+
+/**
+ * Unary prefix Operator conversion class used to convert expression like 
Unary NOT and Minus
+ */
+public class UnaryPrefixOperatorConversion implements 
DruidSqlOperatorConverter {
+
+  private final SqlOperator operator;
+  private final String druidOperator;
+
+  public UnaryPrefixOperatorConversion(final SqlOperator operator, final 
String druidOperator) {
+    this.operator = operator;
+    this.druidOperator = druidOperator;
+  }
+
+  @Override public SqlOperator calciteOperator() {
+    return operator;
+  }
+
+  @Override public String toDruidExpression(RexNode rexNode, RelDataType 
rowType,
+      DruidQuery druidQuery) {
+
+    final RexCall call = (RexCall) rexNode;
+
+    final List<String> druidExpressions = DruidExpressions.toDruidExpressions(
+        druidQuery, rowType,
+        call.getOperands());
+
+    if (druidExpressions == null) {
+      return null;
+    }
+
+    return DruidQuery
+        .format("(%s %s)", druidOperator, 
Iterables.getOnlyElement(druidExpressions));
+  }
+}
+
+// End UnaryPrefixOperatorConversion.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/UnarySuffixOperatorConversion.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/UnarySuffixOperatorConversion.java
 
b/druid/src/main/java/org/apache/calcite/adapter/druid/UnarySuffixOperatorConversion.java
new file mode 100644
index 0000000..015415f
--- /dev/null
+++ 
b/druid/src/main/java/org/apache/calcite/adapter/druid/UnarySuffixOperatorConversion.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+
+import com.google.common.collect.Iterables;
+
+import java.util.List;
+
+/**
+ * Unary suffix operator conversion, used to convert function like: expression 
Unary_Operator
+ */
+public class UnarySuffixOperatorConversion implements 
DruidSqlOperatorConverter {
+  private final SqlOperator operator;
+  private final String druidOperator;
+
+  public UnarySuffixOperatorConversion(SqlOperator operator, String 
druidOperator) {
+    this.operator = operator;
+    this.druidOperator = druidOperator;
+  }
+
+  @Override public SqlOperator calciteOperator() {
+    return operator;
+  }
+
+  @Override public String toDruidExpression(RexNode rexNode, RelDataType 
rowType,
+      DruidQuery druidQuery) {
+    final RexCall call = (RexCall) rexNode;
+
+    final List<String> druidExpressions = DruidExpressions.toDruidExpressions(
+        druidQuery, rowType,
+        call.getOperands());
+
+    if (druidExpressions == null) {
+      return null;
+    }
+
+    return DruidQuery.format(
+            "(%s %s)",
+            Iterables.getOnlyElement(druidExpressions), druidOperator);
+  }
+}
+
+// End UnarySuffixOperatorConversion.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/main/java/org/apache/calcite/adapter/druid/VirtualColumn.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/VirtualColumn.java 
b/druid/src/main/java/org/apache/calcite/adapter/druid/VirtualColumn.java
new file mode 100644
index 0000000..7348cec
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/VirtualColumn.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.Locale;
+
+import static org.apache.calcite.adapter.druid.DruidQuery.writeFieldIf;
+
+/**
+ * Druid Json Expression based Virtual Column.
+ * Virtual columns is used as "projection" concept throughout Druid using 
expression.
+ */
+public class VirtualColumn implements DruidJson {
+  private final String name;
+
+  private final String expression;
+
+  private final DruidType outputType;
+
+  public VirtualColumn(String name, String expression, DruidType outputType) {
+    this.name = Preconditions.checkNotNull(name);
+    this.expression = Preconditions.checkNotNull(expression);
+    this.outputType = outputType == null ? DruidType.FLOAT : outputType;
+  }
+
+  @Override public void write(JsonGenerator generator) throws IOException {
+    generator.writeStartObject();
+    generator.writeStringField("type", "expression");
+    generator.writeStringField("name", name);
+    generator.writeStringField("expression", expression);
+    writeFieldIf(generator, "outputType", 
getOutputType().toString().toUpperCase(Locale.ENGLISH));
+    generator.writeEndObject();
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getExpression() {
+    return expression;
+  }
+
+  public DruidType getOutputType() {
+    return outputType;
+  }
+
+  /**
+   * Virtual Column Builder
+   */
+  public static class Builder {
+    private String name;
+
+    private String expression;
+
+    private DruidType type;
+
+    public Builder withName(String name) {
+      this.name = name;
+      return this;
+    }
+
+    public Builder withExpression(String expression) {
+      this.expression = expression;
+      return this;
+    }
+
+    public Builder withType(DruidType type) {
+      this.type = type;
+      return this;
+    }
+
+    public VirtualColumn build() {
+      return new VirtualColumn(name, expression, type);
+    }
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+}
+
+// End VirtualColumn.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5acf84f3/druid/src/test/java/org/apache/calcite/adapter/druid/DruidQueryFilterTest.java
----------------------------------------------------------------------
diff --git 
a/druid/src/test/java/org/apache/calcite/adapter/druid/DruidQueryFilterTest.java
 
b/druid/src/test/java/org/apache/calcite/adapter/druid/DruidQueryFilterTest.java
index e8e42be..16e1f59 100644
--- 
a/druid/src/test/java/org/apache/calcite/adapter/druid/DruidQueryFilterTest.java
+++ 
b/druid/src/test/java/org/apache/calcite/adapter/druid/DruidQueryFilterTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.adapter.druid;
 
+import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
@@ -30,13 +31,12 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.io.StringWriter;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.math.BigDecimal;
 import java.util.List;
 
@@ -47,8 +47,22 @@ import static org.hamcrest.core.Is.is;
  */
 public class DruidQueryFilterTest {
 
-  @Test public void testInFilter() throws NoSuchMethodException,
-      InvocationTargetException, IllegalAccessException, IOException {
+  private DruidQuery druidQuery;
+  @Before
+  public void testSetup() {
+    druidQuery = Mockito.mock(DruidQuery.class);
+    final CalciteConnectionConfig connectionConfigMock = Mockito
+        .mock(CalciteConnectionConfig.class);
+    Mockito.when(connectionConfigMock.timeZone()).thenReturn("UTC");
+    
Mockito.when(druidQuery.getConnectionConfig()).thenReturn(connectionConfigMock);
+    Mockito.when(druidQuery.getDruidTable())
+        .thenReturn(
+            new DruidTable(Mockito.mock(DruidSchema.class), "dataSource", null,
+                ImmutableSet.<String>of(), "timestamp", null, null,
+                null
+            ));
+  }
+  @Test public void testInFilter() throws IOException {
     final Fixture f = new Fixture();
     final List<? extends RexNode> listRexNodes =
         ImmutableList.of(f.rexBuilder.makeInputRef(f.varcharRowType, 0),
@@ -58,13 +72,9 @@ public class DruidQueryFilterTest {
 
     RexNode inRexNode =
         f.rexBuilder.makeCall(SqlStdOperatorTable.IN, listRexNodes);
-    Method translateFilter =
-        DruidQuery.Translator.class.getDeclaredMethod("translateFilter",
-            RexNode.class);
-    translateFilter.setAccessible(true);
-    DruidQuery.JsonInFilter returnValue =
-        (DruidQuery.JsonInFilter) 
translateFilter.invoke(f.translatorStringKind,
-            inRexNode);
+    DruidJsonFilter returnValue = DruidJsonFilter
+        .toDruidFilters(inRexNode, f.varcharRowType, druidQuery);
+    Assert.assertNotNull("Filter is null", returnValue);
     JsonFactory jsonFactory = new JsonFactory();
     final StringWriter sw = new StringWriter();
     JsonGenerator jsonGenerator = jsonFactory.createGenerator(sw);
@@ -76,8 +86,7 @@ public class DruidQueryFilterTest {
             + "\"values\":[\"1\",\"5\",\"value1\"]}"));
   }
 
-  @Test public void testBetweenFilterStringCase() throws NoSuchMethodException,
-      InvocationTargetException, IllegalAccessException, IOException {
+  @Test public void testBetweenFilterStringCase() throws IOException {
     final Fixture f = new Fixture();
     final List<RexNode> listRexNodes =
         ImmutableList.of(f.rexBuilder.makeLiteral(false),
@@ -88,13 +97,9 @@ public class DruidQueryFilterTest {
     RexNode betweenRexNode = f.rexBuilder.makeCall(relDataType,
         SqlStdOperatorTable.BETWEEN, listRexNodes);
 
-    Method translateFilter =
-        DruidQuery.Translator.class.getDeclaredMethod("translateFilter",
-            RexNode.class);
-    translateFilter.setAccessible(true);
-    DruidQuery.JsonBound returnValue =
-        (DruidQuery.JsonBound) translateFilter.invoke(f.translatorStringKind,
-            betweenRexNode);
+    DruidJsonFilter returnValue = DruidJsonFilter
+        .toDruidFilters(betweenRexNode, f.varcharRowType, druidQuery);
+    Assert.assertNotNull("Filter is null", returnValue);
     JsonFactory jsonFactory = new JsonFactory();
     final StringWriter sw = new StringWriter();
     JsonGenerator jsonGenerator = jsonFactory.createGenerator(sw);
@@ -120,8 +125,6 @@ public class DruidQueryFilterTest {
     final RelDataType varcharRowType = typeFactory.builder()
         .add("dimensionName", varcharType)
         .build();
-    final DruidQuery.Translator translatorStringKind =
-        new DruidQuery.Translator(druidTable, varcharRowType, "UTC");
   }
 }
 

Reply via email to