http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidResultEnumerator.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidResultEnumerator.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidResultEnumerator.java deleted file mode 100644 index 35b97b3..0000000 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidResultEnumerator.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.adapter.druid; - -/** - * Created by jhyde on 3/9/16. - */ -public class DruidResultEnumerator { -} - -// End DruidResultEnumerator.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java index 290f548..7b6bc78 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java @@ -22,7 +22,6 @@ import org.apache.calcite.plan.RelOptPredicateList; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; @@ -41,6 +40,7 @@ import org.apache.calcite.rel.rules.PushProjector; import org.apache.calcite.rel.rules.SortProjectTransposeRule; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexExecutor; @@ -50,14 +50,10 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexShuttle; import org.apache.calcite.rex.RexSimplify; import org.apache.calcite.rex.RexUtil; -import org.apache.calcite.runtime.PredicateImpl; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.sql.type.SqlTypeFamily; -import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.tools.RelBuilderFactory; -import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.Pair; import org.apache.calcite.util.Util; import org.apache.calcite.util.trace.CalciteTrace; @@ -67,7 +63,7 @@ import org.apache.commons.lang3.tuple.Triple; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.joda.time.Interval; @@ -79,6 +75,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import javax.annotation.Nullable; + /** * Rules and relational operators for {@link DruidQuery}. */ @@ -113,6 +111,8 @@ public class DruidRules { new DruidPostAggregationProjectRule(RelFactories.LOGICAL_BUILDER); public static final DruidAggregateExtractProjectRule PROJECT_EXTRACT_RULE = new DruidAggregateExtractProjectRule(RelFactories.LOGICAL_BUILDER); + public static final DruidHavingFilterRule DRUID_HAVING_FILTER_RULE = + new DruidHavingFilterRule(RelFactories.LOGICAL_BUILDER); public static final List<RelOptRule> RULES = ImmutableList.of(FILTER, @@ -127,73 +127,8 @@ public class DruidRules { FILTER_PROJECT_TRANSPOSE, PROJECT_SORT_TRANSPOSE, SORT, - SORT_PROJECT_TRANSPOSE); - - /** Predicate that returns whether Druid can not handle an aggregate. */ - private static final Predicate<Triple<Aggregate, RelNode, DruidQuery>> BAD_AGG = - new PredicateImpl<Triple<Aggregate, RelNode, DruidQuery>>() { - public boolean test(Triple<Aggregate, RelNode, DruidQuery> triple) { - final Aggregate aggregate = triple.getLeft(); - final RelNode node = triple.getMiddle(); - final DruidQuery query = triple.getRight(); - - final CalciteConnectionConfig config = query.getConnectionConfig(); - for (AggregateCall aggregateCall : aggregate.getAggCallList()) { - switch (aggregateCall.getAggregation().getKind()) { - case COUNT: - // Druid count aggregator can handle 3 scenarios: - // 1. count(distinct col) when approximate results - // are acceptable and col is not a metric. - // Note that exact count(distinct column) is handled - // by being rewritten into group by followed by count - // 2. count(*) - // 3. count(column) - - if (checkAggregateOnMetric(ImmutableBitSet.of(aggregateCall.getArgList()), - node, query)) { - return true; - } - // case count(*) - if (aggregateCall.getArgList().isEmpty()) { - continue; - } - // case count(column) - if (aggregateCall.getArgList().size() == 1 && !aggregateCall.isDistinct()) { - continue; - } - // case count(distinct and is approximate) - if (aggregateCall.isDistinct() - && (aggregateCall.isApproximate() || config.approximateDistinctCount())) { - continue; - } - return true; - case SUM: - case SUM0: - case MIN: - case MAX: - final RelDataType type = aggregateCall.getType(); - final SqlTypeName sqlTypeName = type.getSqlTypeName(); - if (SqlTypeFamily.APPROXIMATE_NUMERIC.getTypeNames().contains(sqlTypeName) - || SqlTypeFamily.INTEGER.getTypeNames().contains(sqlTypeName)) { - continue; - } else if (SqlTypeFamily.EXACT_NUMERIC.getTypeNames().contains(sqlTypeName)) { - // Decimal - assert sqlTypeName == SqlTypeName.DECIMAL; - if (type.getScale() == 0 || config.approximateDecimal()) { - // If scale is zero or we allow approximating decimal, we can proceed - continue; - } - } - // Cannot handle this aggregate function - return true; - default: - // Cannot handle this aggregate function - return true; - } - } - return false; - } - }; + SORT_PROJECT_TRANSPOSE, + DRUID_HAVING_FILTER_RULE); /** * Rule to push a {@link org.apache.calcite.rel.core.Filter} into a {@link DruidQuery}. @@ -231,7 +166,9 @@ public class DruidRules { new RexSimplify(rexBuilder, predicates, true, executor); final RexNode cond = simplify.simplify(filter.getCondition()); for (RexNode e : RelOptUtil.conjunctions(cond)) { - if (query.isValidFilter(e)) { + DruidJsonFilter druidJsonFilter = DruidJsonFilter + .toDruidFilters(e, filter.getInput().getRowType(), query); + if (druidJsonFilter != null) { validPreds.add(e); } else { nonValidPreds.add(e); @@ -239,19 +176,17 @@ public class DruidRules { } // Timestamp - int timestampFieldIdx = -1; - for (int i = 0; i < query.getRowType().getFieldCount(); i++) { - if (query.druidTable.timestampFieldName.equals( - query.getRowType().getFieldList().get(i).getName())) { - timestampFieldIdx = i; - break; - } - } - + int timestampFieldIdx = Iterables + .indexOf(query.getRowType().getFieldList(), new Predicate<RelDataTypeField>() { + @Override public boolean apply(@Nullable RelDataTypeField input) { + return query.druidTable.timestampFieldName.equals(input.getName()); + } + }); + RelNode newDruidQuery = query; final Triple<List<RexNode>, List<RexNode>, List<RexNode>> triple = splitFilters(rexBuilder, query, validPreds, nonValidPreds, timestampFieldIdx); if (triple.getLeft().isEmpty() && triple.getMiddle().isEmpty()) { - // We can't push anything useful to Druid. + //it sucks, nothing to push return; } final List<RexNode> residualPreds = new ArrayList<>(triple.getRight()); @@ -262,13 +197,14 @@ public class DruidRules { assert timeZone != null; intervals = DruidDateTimeUtils.createInterval( RexUtil.composeConjunction(rexBuilder, triple.getLeft(), false), - timeZone); + + query.getConnectionConfig().timeZone()); if (intervals == null || intervals.isEmpty()) { - // Case we have an filter with extract that can not be written as interval push down + // Case we have a filter with extract that can not be written as interval push down triple.getMiddle().addAll(triple.getLeft()); } } - RelNode newDruidQuery = query; + if (!triple.getMiddle().isEmpty()) { final RelNode newFilter = filter.copy(filter.getTraitSet(), Util.last(query.rels), RexUtil.composeConjunction(rexBuilder, triple.getMiddle(), false)); @@ -304,13 +240,9 @@ public class DruidRules { for (RexNode conj : validPreds) { final RelOptUtil.InputReferencedVisitor visitor = new RelOptUtil.InputReferencedVisitor(); conj.accept(visitor); - if (visitor.inputPosReferenced.contains(timestampFieldIdx)) { - if (visitor.inputPosReferenced.size() != 1) { - // Complex predicate, transformation currently not supported - nonPushableNodes.add(conj); - } else { - timeRangeNodes.add(conj); - } + if (visitor.inputPosReferenced.contains(timestampFieldIdx) + && visitor.inputPosReferenced.size() == 1) { + timeRangeNodes.add(conj); } else { pushableNodes.add(conj); } @@ -320,6 +252,36 @@ public class DruidRules { } /** + * Rule to Push a Having {@link Filter} into a {@link DruidQuery} + */ + public static class DruidHavingFilterRule extends RelOptRule { + + public DruidHavingFilterRule(RelBuilderFactory relBuilderFactory) { + super(operand(Filter.class, operand(DruidQuery.class, none())), + relBuilderFactory, null); + } + + @Override public void onMatch(RelOptRuleCall call) { + final Filter filter = call.rel(0); + final DruidQuery query = call.rel(1); + + if (!DruidQuery.isValidSignature(query.signature() + 'h')) { + return; + } + + final RexNode cond = filter.getCondition(); + final DruidJsonFilter druidJsonFilter = DruidJsonFilter + .toDruidFilters(cond, query.getTopNode().getRowType(), query); + if (druidJsonFilter != null) { + final RelNode newFilter = filter + .copy(filter.getTraitSet(), Util.last(query.rels), filter.getCondition()); + final DruidQuery newDruidQuery = DruidQuery.extendQuery(query, newFilter); + call.transformTo(newDruidQuery); + } + } + } + + /** * Rule to push a {@link org.apache.calcite.rel.core.Project} into a {@link DruidQuery}. */ public static class DruidProjectRule extends RelOptRule { @@ -343,14 +305,16 @@ public class DruidRules { return; } - if (canProjectAll(project.getProjects())) { + if (DruidQuery.computeProjectAsScan(project, query.getTable().getRowType(), query) + != null) { // All expressions can be pushed to Druid in their entirety. final RelNode newProject = project.copy(project.getTraitSet(), - ImmutableList.of(Util.last(query.rels))); + ImmutableList.of(Util.last(query.rels))); RelNode newNode = DruidQuery.extendQuery(query, newProject); call.transformTo(newNode); return; } + final Pair<List<RexNode>, List<RexNode>> pair = splitProjects(rexBuilder, query, project.getProjects()); if (pair == null) { @@ -378,15 +342,6 @@ public class DruidRules { call.transformTo(newProject2); } - private static boolean canProjectAll(List<RexNode> nodes) { - for (RexNode e : nodes) { - if (!(e instanceof RexInputRef)) { - return false; - } - } - return true; - } - private static Pair<List<RexNode>, List<RexNode>> splitProjects(final RexBuilder rexBuilder, final RelNode input, List<RexNode> nodes) { final RelOptUtil.InputReferencedVisitor visitor = new RelOptUtil.InputReferencedVisitor(); @@ -442,183 +397,37 @@ public class DruidRules { public void onMatch(RelOptRuleCall call) { Project project = call.rel(0); DruidQuery query = call.rel(1); - final RelOptCluster cluster = project.getCluster(); - final RexBuilder rexBuilder = cluster.getRexBuilder(); if (!DruidQuery.isValidSignature(query.signature() + 'o')) { return; } - Pair<ImmutableMap<String, String>, Boolean> scanned = scanProject(query, project); - // Only try to push down Project when there will be Post aggregators in result DruidQuery - if (scanned.right) { - Pair<Project, Project> splitProjectAggregate = splitProject(rexBuilder, query, - project, scanned.left, cluster); - Project inner = splitProjectAggregate.left; - Project outer = splitProjectAggregate.right; - DruidQuery newQuery = DruidQuery.extendQuery(query, inner); - // When all project get pushed into DruidQuery, the project can be replaced by DruidQuery. - if (outer != null) { - Project newProject = outer.copy(outer.getTraitSet(), newQuery, outer.getProjects(), - outer.getRowType()); - call.transformTo(newProject); - } else { - call.transformTo(newQuery); + boolean hasRexCalls = false; + for (RexNode rexNode : project.getChildExps()) { + if (rexNode instanceof RexCall) { + hasRexCalls = true; + break; } } - } + // Only try to push down Project when there will be Post aggregators in result DruidQuery + if (hasRexCalls) { - /** - * Similar to split Project in DruidProjectRule. It used the name mapping from scanProject - * to render the correct field names of inner project so that the outer project can correctly - * refer to them. For RexNode that can be parsed into post aggregator, they will get pushed in - * before input reference, then outer project can simply refer to those pushed in RexNode to - * get result. - * @param rexBuilder builder from cluster - * @param query matched Druid Query - * @param project matched project takes in druid - * @param nameMap Result nameMapping from scanProject - * @param cluster cluster that provide builder for row type. - * @return Triple object contains inner project, outer project and required - * Json Post Aggregation objects to be pushed down into Druid Query. - */ - public Pair<Project, Project> splitProject(final RexBuilder rexBuilder, - DruidQuery query, Project project, ImmutableMap<String, String> nameMap, - final RelOptCluster cluster) { - //Visit & Build Inner Project - final List<RexNode> innerRex = new ArrayList<>(); - final RelDataTypeFactory.Builder typeBuilder = - cluster.getTypeFactory().builder(); - final RelOptUtil.InputReferencedVisitor visitor = - new RelOptUtil.InputReferencedVisitor(); - final List<Integer> positions = new ArrayList<>(); - final List<RelDataType> innerTypes = new ArrayList<>(); - // Similar logic to splitProject in DruidProject Rule - // However, post aggregation will also be output of DruidQuery and they will be - // added before other input. - int offset = 0; - for (Pair<RexNode, String> pair : project.getNamedProjects()) { - RexNode rex = pair.left; - String name = pair.right; - String fieldName = nameMap.get(name); - if (fieldName == null) { - rex.accept(visitor); + final RelNode topNode = query.getTopNode(); + final Aggregate topAgg; + if (topNode instanceof Aggregate) { + topAgg = (Aggregate) topNode; } else { - final RexNode node = rexBuilder.copy(rex); - innerRex.add(node); - positions.add(offset++); - typeBuilder.add(nameMap.get(name), node.getType()); - innerTypes.add(node.getType()); + topAgg = (Aggregate) ((Filter) topNode).getInput(); } - } - // Other referred input will be added into the inner project rex list. - positions.addAll(visitor.inputPosReferenced); - for (int i : visitor.inputPosReferenced) { - final RexNode node = rexBuilder.makeInputRef(Util.last(query.rels), i); - innerRex.add(node); - typeBuilder.add(query.getRowType().getFieldNames().get(i), node.getType()); - innerTypes.add(node.getType()); - } - Project innerProject = project.copy(project.getTraitSet(), Util.last(query.rels), innerRex, - typeBuilder.build()); - // If the whole project is pushed, we do not need to do anything else. - if (project.getNamedProjects().size() == nameMap.size()) { - return new Pair<>(innerProject, null); - } - // Build outer Project when some projects are left in outer project. - offset = 0; - final List<RexNode> outerRex = new ArrayList<>(); - for (Pair<RexNode, String> pair : project.getNamedProjects()) { - RexNode rex = pair.left; - String name = pair.right; - if (!nameMap.containsKey(name)) { - outerRex.add( - rex.accept( - new RexShuttle() { - @Override public RexNode visitInputRef(RexInputRef ref) { - final int j = positions.indexOf(ref.getIndex()); - return rexBuilder.makeInputRef(innerTypes.get(j), j); - } - })); - } else { - outerRex.add( - rexBuilder.makeInputRef(rex.getType(), - positions.indexOf(offset++))); - } - } - Project outerProject = project.copy(project.getTraitSet(), innerProject, - outerRex, project.getRowType()); - return new Pair<>(innerProject, outerProject); - } - - /** - * Scans the project. - * - * <p>Takes Druid Query as input to figure out which expression can be - * pushed down. Also returns a map to show the correct field name in Druid - * Query for columns get pushed in. - * - * @param query matched Druid Query - * @param project Matched project that takes in Druid Query - * @return Pair that shows how name map with each other. - */ - public Pair<ImmutableMap<String, String>, Boolean> scanProject( - DruidQuery query, Project project) { - List<String> aggNamesWithGroup = query.getRowType().getFieldNames(); - final ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder(); - int j = 0; - boolean ret = false; - for (Pair<RexNode, String> namedProject : project.getNamedProjects()) { - RexNode rex = namedProject.left; - String name = namedProject.right; - // Find out the corresponding fieldName for DruidQuery to fetch result - // in DruidConnectionImpl, give specific name for post aggregator - if (rex instanceof RexCall) { - if (checkPostAggregatorExist(rex)) { - String postAggName = "postagg#" + j++; - mapBuilder.put(name, postAggName); - ret = true; - } - } else if (rex instanceof RexInputRef) { - String fieldName = aggNamesWithGroup.get(((RexInputRef) rex).getIndex()); - mapBuilder.put(name, fieldName); - } - } - return new Pair<>(mapBuilder.build(), ret); - } - /** - * Recursively check whether the rexNode can be parsed into post aggregator in druid query - * Have to fulfill conditions below: - * 1. Arithmetic operation +, -, /, * or CAST in SQL - * 2. Simple input reference refer to the result of Aggregate or Grouping - * 3. A constant - * 4. All input referred should also be able to be parsed - * @param rexNode input RexNode to be recursively checked - * @return a boolean shows whether this rexNode can be parsed or not. - */ - public boolean checkPostAggregatorExist(RexNode rexNode) { - if (rexNode instanceof RexCall) { - for (RexNode ele : ((RexCall) rexNode).getOperands()) { - boolean inputRex = checkPostAggregatorExist(ele); - if (!inputRex) { - return false; + for (RexNode rexNode : project.getProjects()) { + if (DruidExpressions.toDruidExpression(rexNode, topAgg.getRowType(), query) == null) { + return; } } - switch (rexNode.getKind()) { - case PLUS: - case MINUS: - case DIVIDE: - case TIMES: - //case CAST: - return true; - default: - return false; - } - } else if (rexNode instanceof RexInputRef || rexNode instanceof RexLiteral) { - // Do not have to check the source of input because the signature checking ensure - // the input of project must be Aggregate. - return true; + final RelNode newProject = project + .copy(project.getTraitSet(), ImmutableList.of(Util.last(query.rels))); + final DruidQuery newQuery = DruidQuery.extendQuery(query, newProject); + call.transformTo(newQuery); } - return false; } } @@ -640,28 +449,30 @@ public class DruidRules { public void onMatch(RelOptRuleCall call) { final Aggregate aggregate = call.rel(0); final DruidQuery query = call.rel(1); + final RelNode topDruidNode = query.getTopNode(); + final Project project = topDruidNode instanceof Project ? (Project) topDruidNode : null; if (!DruidQuery.isValidSignature(query.signature() + 'a')) { return; } if (aggregate.indicator - || aggregate.getGroupSets().size() != 1 - || BAD_AGG.apply(ImmutableTriple.of(aggregate, (RelNode) aggregate, query)) - || !validAggregate(aggregate, query)) { + || aggregate.getGroupSets().size() != 1) { return; } - final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(), - ImmutableList.of(Util.last(query.rels))); - call.transformTo(DruidQuery.extendQuery(query, newAggregate)); - } - - /* Check whether agg functions reference timestamp */ - private static boolean validAggregate(Aggregate aggregate, DruidQuery query) { - ImmutableBitSet.Builder builder = ImmutableBitSet.builder(); - for (AggregateCall aggCall : aggregate.getAggCallList()) { - builder.addAll(aggCall.getArgList()); + if (DruidQuery + .computeProjectGroupSet(project, aggregate.getGroupSet(), query.table.getRowType(), query) + == null) { + return; + } + final List<String> aggNames = Util + .skip(aggregate.getRowType().getFieldNames(), aggregate.getGroupSet().cardinality()); + if (DruidQuery.computeDruidJsonAgg(aggregate.getAggCallList(), aggNames, project, query) + == null) { + return; } - return !checkTimestampRefOnQuery(builder.build(), query.getTopNode(), query); + final RelNode newAggregate = aggregate + .copy(aggregate.getTraitSet(), ImmutableList.of(query.getTopNode())); + call.transformTo(DruidQuery.extendQuery(query, newAggregate)); } } @@ -691,34 +502,26 @@ public class DruidRules { if (!DruidQuery.isValidSignature(query.signature() + 'p' + 'a')) { return; } - - int timestampIdx = validProject(project, query); - List<Integer> filterRefs = getFilterRefs(aggregate.getAggCallList()); - - if (timestampIdx == -1 && filterRefs.size() == 0) { + if (aggregate.indicator + || aggregate.getGroupSets().size() != 1) { return; } - - // Check that the filters that the Aggregate calls refer to are valid filters can be pushed - // into Druid - for (Integer i : filterRefs) { - RexNode filterNode = project.getProjects().get(i); - if (!query.isValidFilter(filterNode) || filterNode.isAlwaysFalse()) { - return; - } + if (DruidQuery + .computeProjectGroupSet(project, aggregate.getGroupSet(), query.table.getRowType(), query) + == null) { + return; } - - if (aggregate.indicator - || aggregate.getGroupSets().size() != 1 - || BAD_AGG.apply(ImmutableTriple.of(aggregate, (RelNode) project, query)) - || !validAggregate(aggregate, timestampIdx, filterRefs.size())) { + final List<String> aggNames = Util + .skip(aggregate.getRowType().getFieldNames(), aggregate.getGroupSet().cardinality()); + if (DruidQuery.computeDruidJsonAgg(aggregate.getAggCallList(), aggNames, project, query) + == null) { return; } final RelNode newProject = project.copy(project.getTraitSet(), ImmutableList.of(Util.last(query.rels))); final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(), ImmutableList.of(newProject)); - + List<Integer> filterRefs = getFilterRefs(aggregate.getAggCallList()); final DruidQuery query2; if (filterRefs.size() > 0) { query2 = optimizeFilteredAggregations(call, query, (Project) newProject, @@ -912,81 +715,6 @@ public class DruidRules { return refs; } - /* To be a valid Project, we allow it to contain references, and a single call - * to a FLOOR function on the timestamp column OR valid time EXTRACT on the timestamp column. - * Returns the reference to the timestamp, if any. */ - private static int validProject(Project project, DruidQuery query) { - List<RexNode> nodes = project.getProjects(); - int idxTimestamp = -1; - boolean hasFloor = false; - for (int i = 0; i < nodes.size(); i++) { - final RexNode e = nodes.get(i); - if (e instanceof RexCall) { - // It is a call, check that it is EXTRACT and follow-up conditions - final RexCall call = (RexCall) e; - final String timeZone = query.getCluster().getPlanner().getContext() - .unwrap(CalciteConnectionConfig.class).timeZone(); - assert timeZone != null; - if (DruidDateTimeUtils.extractGranularity(call, timeZone) == null) { - return -1; - } - if (idxTimestamp != -1 && hasFloor) { - // Already one usage of timestamp column - return -1; - } - switch (call.getKind()) { - case FLOOR: - hasFloor = true; - if (!(call.getOperands().get(0) instanceof RexInputRef)) { - return -1; - } - final RexInputRef ref = (RexInputRef) call.getOperands().get(0); - if (!(checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()), - query.getTopNode(), - query))) { - return -1; - } - idxTimestamp = i; - break; - case EXTRACT: - idxTimestamp = RelOptUtil.InputFinder.bits(call).asList().get(0); - break; - default: - throw new AssertionError(); - } - continue; - } - if (!(e instanceof RexInputRef)) { - // It needs to be a reference - return -1; - } - final RexInputRef ref = (RexInputRef) e; - if (checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()), - query.getTopNode(), query)) { - if (idxTimestamp != -1) { - // Already one usage of timestamp column - return -1; - } - idxTimestamp = i; - } - } - return idxTimestamp; - } - - private static boolean validAggregate(Aggregate aggregate, int idx, int numFilterRefs) { - if (numFilterRefs > 0 && idx < 0) { - return true; - } - if (!aggregate.getGroupSet().get(idx)) { - return false; - } - for (AggregateCall aggCall : aggregate.getAggCallList()) { - if (aggCall.getArgList().contains(idx)) { - return false; - } - } - return true; - } } /** @@ -1054,146 +782,20 @@ public class DruidRules { return; } // Either it is: - // - a sort and limit on a dimension/metric part of the druid group by query or - // - a sort without limit on the time column on top of - // Agg operator (transformable to timeseries query), or - // - a simple limit on top of other operator than Agg - if (!validSortLimit(sort, query)) { - return; - } - final RelNode newSort = sort.copy(sort.getTraitSet(), - ImmutableList.of(Util.last(query.rels))); - call.transformTo(DruidQuery.extendQuery(query, newSort)); - } - - /** Checks whether sort is valid. */ - private static boolean validSortLimit(Sort sort, DruidQuery query) { + // - a pure limit above a query of type scan + // - a sort and limit on a dimension/metric part of the druid group by query if (sort.offset != null && RexLiteral.intValue(sort.offset) != 0) { // offset not supported by Druid - return false; - } - // Use a different logic to push down Sort RelNode because the top node could be a Project now - RelNode topNode = query.getTopNode(); - Aggregate topAgg; - if (topNode instanceof Project && ((Project) topNode).getInput() instanceof Aggregate) { - topAgg = (Aggregate) ((Project) topNode).getInput(); - } else if (topNode instanceof Aggregate) { - topAgg = (Aggregate) topNode; - } else { - // If it is going to be a Druid select operator, we push the limit if - // it does not contain a sort specification (required by Druid) - return RelOptUtil.isPureLimit(sort); - } - final ImmutableBitSet.Builder positionsReferenced = ImmutableBitSet.builder(); - for (RelFieldCollation col : sort.collation.getFieldCollations()) { - int idx = col.getFieldIndex(); - if (idx >= topAgg.getGroupCount()) { - continue; - } - //has the indexes of the columns used for sorts - positionsReferenced.set(topAgg.getGroupSet().nth(idx)); - } - // Case it is a timeseries query - if (checkIsFlooringTimestampRefOnQuery(topAgg.getGroupSet(), topAgg.getInput(), query) - && topAgg.getGroupCount() == 1) { - // do not push if it has a limit or more than one sort key or we have sort by - // metric/dimension - return !RelOptUtil.isLimit(sort) && sort.collation.getFieldCollations().size() == 1 - && checkTimestampRefOnQuery(positionsReferenced.build(), topAgg.getInput(), query); - } - return true; - } - } - - /** Returns true if any of the grouping key is a floor operator over the timestamp column. */ - private static boolean checkIsFlooringTimestampRefOnQuery(ImmutableBitSet set, RelNode top, - DruidQuery query) { - if (top instanceof Project) { - ImmutableBitSet.Builder newSet = ImmutableBitSet.builder(); - final Project project = (Project) top; - for (int index : set) { - RexNode node = project.getProjects().get(index); - if (node instanceof RexCall) { - RexCall call = (RexCall) node; - final String timeZone = query.getCluster().getPlanner().getContext() - .unwrap(CalciteConnectionConfig.class).timeZone(); - assert timeZone != null; - assert DruidDateTimeUtils.extractGranularity(call, timeZone) != null; - if (call.getKind() == SqlKind.FLOOR) { - newSet.addAll(RelOptUtil.InputFinder.bits(call)); - } - } - } - top = project.getInput(); - set = newSet.build(); - } - // Check if any references the timestamp column - for (int index : set) { - if (query.druidTable.timestampFieldName.equals( - top.getRowType().getFieldNames().get(index))) { - return true; - } - } - - return false; - } - - /** Checks whether any of the references leads to the timestamp column. */ - private static boolean checkTimestampRefOnQuery(ImmutableBitSet set, RelNode top, - DruidQuery query) { - if (top instanceof Project) { - ImmutableBitSet.Builder newSet = ImmutableBitSet.builder(); - final Project project = (Project) top; - for (int index : set) { - RexNode node = project.getProjects().get(index); - if (node instanceof RexInputRef) { - newSet.set(((RexInputRef) node).getIndex()); - } else if (node instanceof RexCall) { - RexCall call = (RexCall) node; - final String timeZone = query.getCluster().getPlanner().getContext() - .unwrap(CalciteConnectionConfig.class).timeZone(); - assert timeZone != null; - assert DruidDateTimeUtils.extractGranularity(call, timeZone) != null; - // when we have extract from time column the rexCall is of the form - // "/Reinterpret$0" - newSet.addAll(RelOptUtil.InputFinder.bits(call)); - } + return; } - top = project.getInput(); - set = newSet.build(); - } - - // Check if any references the timestamp column - for (int index : set) { - if (query.druidTable.timestampFieldName.equals( - top.getRowType().getFieldNames().get(index))) { - return true; + if (query.getQueryType() == QueryType.SCAN && !RelOptUtil.isPureLimit(sort)) { + return; } - } - return false; - } - - /** Checks whether any of the references leads to a metric column. */ - private static boolean checkAggregateOnMetric(ImmutableBitSet set, RelNode topProject, - DruidQuery query) { - if (topProject instanceof Project) { - ImmutableBitSet.Builder newSet = ImmutableBitSet.builder(); - final Project project = (Project) topProject; - for (int index : set) { - RexNode node = project.getProjects().get(index); - ImmutableBitSet setOfBits = RelOptUtil.InputFinder.bits(node); - newSet.addAll(setOfBits); - } - set = newSet.build(); - } - for (int index : set) { - if (query.druidTable.isMetric(query.getTopNode().getRowType().getFieldNames().get(index))) { - return true; - } + final RelNode newSort = sort + .copy(sort.getTraitSet(), ImmutableList.of(Util.last(query.rels))); + call.transformTo(DruidQuery.extendQuery(query, newSort)); } - - return false; } /** http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlCastConverter.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlCastConverter.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlCastConverter.java new file mode 100644 index 0000000..0731a6f --- /dev/null +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlCastConverter.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.druid; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeName; + +import com.google.common.collect.ImmutableList; + +import org.joda.time.Period; + +import java.util.TimeZone; + +/** + * Druid cast converter operator used to translates calcite casts to Druid expression cast + */ +public class DruidSqlCastConverter implements DruidSqlOperatorConverter { + + @Override public SqlOperator calciteOperator() { + return SqlStdOperatorTable.CAST; + } + + @Override public String toDruidExpression(RexNode rexNode, RelDataType topRel, + DruidQuery druidQuery) { + + final RexNode operand = ((RexCall) rexNode).getOperands().get(0); + final String operandExpression = DruidExpressions.toDruidExpression(operand, + topRel, druidQuery); + + if (operandExpression == null) { + return null; + } + + final SqlTypeName fromType = operand.getType().getSqlTypeName(); + final SqlTypeName toType = rexNode.getType().getSqlTypeName(); + final String timeZoneConf = druidQuery.getConnectionConfig().timeZone(); + final TimeZone timeZone = TimeZone.getTimeZone(timeZoneConf == null ? "UTC" : timeZoneConf); + + if (SqlTypeName.CHAR_TYPES.contains(fromType) && SqlTypeName.DATETIME_TYPES.contains(toType)) { + //case chars to dates + return castCharToDateTime(timeZone, operandExpression, + toType); + } else if (SqlTypeName.DATETIME_TYPES.contains(fromType) && SqlTypeName.CHAR_TYPES.contains + (toType)) { + //case dates to chars + return castDateTimeToChar(timeZone, operandExpression, + fromType); + } else { + // Handle other casts. + final DruidType fromExprType = DruidExpressions.EXPRESSION_TYPES.get(fromType); + final DruidType toExprType = DruidExpressions.EXPRESSION_TYPES.get(toType); + + if (fromExprType == null || toExprType == null) { + // Unknown types bail out. + return null; + } + final String typeCastExpression; + if (fromExprType != toExprType) { + typeCastExpression = DruidQuery.format("CAST(%s, '%s')", operandExpression, + toExprType + .toString()); + } else { + // case it is the same type it is ok to skip CAST + typeCastExpression = operandExpression; + } + + if (toType == SqlTypeName.DATE) { + // Floor to day when casting to DATE. + return DruidExpressions.applyTimestampFloor( + typeCastExpression, + Period.days(1).toString(), + "", + TimeZone.getTimeZone(druidQuery.getConnectionConfig().timeZone())); + } else { + return typeCastExpression; + } + + } + } + + private static String castCharToDateTime( + TimeZone timeZone, + String operand, + final SqlTypeName toType) { + // Cast strings to date times by parsing them from SQL format. + final String timestampExpression = DruidExpressions.functionCall( + "timestamp_parse", + ImmutableList.of( + operand, + DruidExpressions.stringLiteral(""), + DruidExpressions.stringLiteral(timeZone.getID()))); + + if (toType == SqlTypeName.DATE) { + // case to date we need to floor to day first + return DruidExpressions.applyTimestampFloor( + timestampExpression, + Period.days(1).toString(), + "", + timeZone); + } else if (toType == SqlTypeName.TIMESTAMP || toType == SqlTypeName + .TIMESTAMP_WITH_LOCAL_TIME_ZONE) { + return timestampExpression; + } else { + throw new IllegalStateException( + DruidQuery.format("Unsupported DateTime type[%s]", toType)); + } + } + + private static String castDateTimeToChar( + final TimeZone timeZone, + final String operand, + final SqlTypeName fromType) { + return DruidExpressions.functionCall( + "timestamp_format", + ImmutableList.of( + operand, + DruidExpressions.stringLiteral(dateTimeFormatString(fromType)), + DruidExpressions.stringLiteral(timeZone.getID()))); + } + + public static String dateTimeFormatString(final SqlTypeName sqlTypeName) { + if (sqlTypeName == SqlTypeName.DATE) { + return "yyyy-MM-dd"; + } else if (sqlTypeName == SqlTypeName.TIMESTAMP) { + return "yyyy-MM-dd HH:mm:ss"; + } else if (sqlTypeName == sqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) { + return "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + } else { + return null; + } + } +} + +// End DruidSqlCastConverter.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlOperatorConverter.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlOperatorConverter.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlOperatorConverter.java new file mode 100644 index 0000000..0ee179a --- /dev/null +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlOperatorConverter.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.druid; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; + +import javax.annotation.Nullable; + +/** + * Defines how to convert RexNode with a given calcite SQL operator to Druid expressions + */ +public interface DruidSqlOperatorConverter { + + /** + * Returns the calcite SQL operator corresponding to Druid operator. + * + * @return operator + */ + SqlOperator calciteOperator(); + + + /** + * Translate rexNode to valid Druid expression. + * @param rexNode rexNode to translate to Druid expression + * @param rowType row type associated with rexNode + * @param druidQuery druid query used to figure out configs/fields related like timeZone + * + * @return valid Druid expression or null if it can not convert the rexNode + */ + @Nullable String toDruidExpression(RexNode rexNode, RelDataType rowType, DruidQuery druidQuery); +} + +// End DruidSqlOperatorConverter.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidType.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidType.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidType.java index f50fdfd..ec601b7 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidType.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidType.java @@ -21,10 +21,10 @@ import org.apache.calcite.sql.type.SqlTypeName; /** Druid type. */ public enum DruidType { LONG(SqlTypeName.BIGINT), - // SQL DOUBLE and FLOAT types are both 64 bit, but we use DOUBLE because - // people find FLOAT confusing. - FLOAT(SqlTypeName.DOUBLE), + FLOAT(SqlTypeName.FLOAT), + DOUBLE(SqlTypeName.DOUBLE), STRING(SqlTypeName.VARCHAR), + COMPLEX(SqlTypeName.OTHER), HYPER_UNIQUE(SqlTypeName.VARBINARY), THETA_SKETCH(SqlTypeName.VARBINARY); @@ -39,13 +39,13 @@ public enum DruidType { * Returns true if and only if this enum should be used inside of a {@link ComplexMetric} * */ public boolean isComplex() { - return this == THETA_SKETCH || this == HYPER_UNIQUE; + return this == THETA_SKETCH || this == HYPER_UNIQUE || this == COMPLEX; } /** * Returns a DruidType matching the given String type from a Druid metric * */ - public static DruidType getTypeFromMetric(String type) { + protected static DruidType getTypeFromMetric(String type) { assert type != null; if (type.equals("hyperUnique")) { return HYPER_UNIQUE; @@ -54,6 +54,8 @@ public enum DruidType { } else if (type.startsWith("long") || type.equals("count")) { return LONG; } else if (type.startsWith("double")) { + return DOUBLE; + } else if (type.startsWith("float")) { return FLOAT; } throw new AssertionError("Unknown type: " + type); @@ -62,13 +64,15 @@ public enum DruidType { /** * Returns a DruidType matching the String from a meta data query * */ - public static DruidType getTypeFromMetaData(String type) { + protected static DruidType getTypeFromMetaData(String type) { assert type != null; switch (type) { case "LONG": return LONG; case "FLOAT": return FLOAT; + case "DOUBLE": + return DOUBLE; case "STRING": return STRING; default: http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractOperatorConversion.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractOperatorConversion.java new file mode 100644 index 0000000..6e35540 --- /dev/null +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractOperatorConversion.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.druid; + +import org.apache.calcite.avatica.util.TimeUnitRange; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; + +import com.google.common.collect.ImmutableMap; + +import java.util.Map; +import java.util.TimeZone; + +/** + * Time extract operator conversion for expressions like EXTRACT(timeUnit FROM arg) + * Unit can be SECOND, MINUTE, HOUR, DAY (day of month), + * DOW (day of week), DOY (day of year), WEEK (week of week year), + * MONTH (1 through 12), QUARTER (1 through 4), or YEAR + **/ +public class ExtractOperatorConversion implements DruidSqlOperatorConverter { + private static final Map<TimeUnitRange, String> EXTRACT_UNIT_MAP = + ImmutableMap.<TimeUnitRange, String>builder() + .put(TimeUnitRange.SECOND, "SECOND") + .put(TimeUnitRange.MINUTE, "MINUTE") + .put(TimeUnitRange.HOUR, "HOUR") + .put(TimeUnitRange.DAY, "DAY") + .put(TimeUnitRange.DOW, "DOW") + .put(TimeUnitRange.DOY, "DOY") + .put(TimeUnitRange.WEEK, "WEEK") + .put(TimeUnitRange.MONTH, "MONTH") + .put(TimeUnitRange.QUARTER, "QUARTER") + .put(TimeUnitRange.YEAR, "YEAR") + .build(); + + @Override public SqlOperator calciteOperator() { + return SqlStdOperatorTable.EXTRACT; + } + + @Override public String toDruidExpression( + RexNode rexNode, RelDataType rowType, DruidQuery query) { + + final RexCall call = (RexCall) rexNode; + final RexLiteral flag = (RexLiteral) call.getOperands().get(0); + final TimeUnitRange calciteUnit = (TimeUnitRange) flag.getValue(); + final RexNode arg = call.getOperands().get(1); + + final String input = DruidExpressions.toDruidExpression(arg, rowType, query); + if (input == null) { + return null; + } + + final String druidUnit = EXTRACT_UNIT_MAP.get(calciteUnit); + if (druidUnit == null) { + return null; + } + + return DruidExpressions.applyTimeExtract( + input, druidUnit, TimeZone.getTimeZone(query.getConnectionConfig().timeZone())); + } +} + +// End ExtractOperatorConversion.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java index 601fc89..0aece36 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java @@ -21,6 +21,8 @@ import com.google.common.base.Preconditions; import java.io.IOException; +import javax.annotation.Nullable; + import static org.apache.calcite.adapter.druid.DruidQuery.writeField; import static org.apache.calcite.adapter.druid.DruidQuery.writeFieldIf; @@ -34,18 +36,37 @@ public class ExtractionDimensionSpec implements DimensionSpec { private final String dimension; private final ExtractionFunction extractionFunction; private final String outputName; + private final DruidType outputType; public ExtractionDimensionSpec(String dimension, ExtractionFunction extractionFunction, String outputName) { + this(dimension, extractionFunction, outputName, DruidType.STRING); + } + + public ExtractionDimensionSpec(String dimension, ExtractionFunction extractionFunction, + String outputName, DruidType outputType) { this.dimension = Preconditions.checkNotNull(dimension); this.extractionFunction = Preconditions.checkNotNull(extractionFunction); this.outputName = outputName; + this.outputType = outputType == null ? DruidType.STRING : outputType; } - public String getOutputName() { + @Override public String getOutputName() { return outputName; } + @Override public DruidType getOutputType() { + return outputType; + } + + @Override public ExtractionFunction getExtractionFn() { + return extractionFunction; + } + + @Override public String getDimension() { + return dimension; + } + @Override public void write(JsonGenerator generator) throws IOException { generator.writeStartObject(); generator.writeStringField("type", "extraction"); @@ -55,6 +76,33 @@ public class ExtractionDimensionSpec implements DimensionSpec { generator.writeEndObject(); } + /** + * @param dimensionSpec Druid Dimesion spec object + * + * @return valid {@link Granularity} of floor extract or null when not possible. + */ + @Nullable + public static Granularity toQueryGranularity(DimensionSpec dimensionSpec) { + if (!DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(dimensionSpec.getDimension())) { + // Only __time column can be substituted by granularity + return null; + } + final ExtractionFunction extractionFunction = dimensionSpec.getExtractionFn(); + if (extractionFunction == null) { + // No Extract thus no Granularity + return null; + } + if (extractionFunction instanceof TimeExtractionFunction) { + Granularity granularity = ((TimeExtractionFunction) extractionFunction).getGranularity(); + String format = ((TimeExtractionFunction) extractionFunction).getFormat(); + if (!TimeExtractionFunction.ISO_TIME_FORMAT.equals(format)) { + return null; + } + return granularity; + } + return null; + } + } // End ExtractionDimensionSpec.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunction.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunction.java b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunction.java index 8143f8c..d572514 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunction.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunction.java @@ -21,7 +21,7 @@ package org.apache.calcite.adapter.druid; * * <p>Extraction functions define the transformation applied to each dimension value. */ -public interface ExtractionFunction extends DruidQuery.Json { +public interface ExtractionFunction extends DruidJson { } // End ExtractionFunction.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/FloorOperatorConversion.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/FloorOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/FloorOperatorConversion.java new file mode 100644 index 0000000..0d8ecc1 --- /dev/null +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/FloorOperatorConversion.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.druid; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; + +import java.util.TimeZone; + +import javax.annotation.Nullable; + + +/** + * DruidSqlOperatorConverter implementation that handles Floor operations conversions + */ +public class FloorOperatorConversion implements DruidSqlOperatorConverter { + @Override public SqlOperator calciteOperator() { + return SqlStdOperatorTable.FLOOR; + } + + @Nullable + @Override public String toDruidExpression(RexNode rexNode, RelDataType rowType, + DruidQuery druidQuery) { + final RexCall call = (RexCall) rexNode; + final RexNode arg = call.getOperands().get(0); + final String druidExpression = DruidExpressions.toDruidExpression( + arg, + rowType, + druidQuery); + if (druidExpression == null) { + return null; + } else if (call.getOperands().size() == 1) { + // case FLOOR(expr) + return DruidQuery.format("floor(%s)", druidExpression); + } else if (call.getOperands().size() == 2) { + // FLOOR(expr TO timeUnit) + final Granularity granularity = DruidDateTimeUtils + .extractGranularity(call, druidQuery.getConnectionConfig().timeZone()); + if (granularity == null) { + return null; + } + String isoPeriodFormat = DruidDateTimeUtils.toISOPeriodFormat(granularity.getType()); + if (isoPeriodFormat == null) { + return null; + } + return DruidExpressions.applyTimestampFloor( + druidExpression, + isoPeriodFormat, + "", + TimeZone.getTimeZone(druidQuery.getConnectionConfig().timeZone())); + } else { + return null; + } + } +} + +// End FloorOperatorConversion.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/Granularities.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/Granularities.java b/druid/src/main/java/org/apache/calcite/adapter/druid/Granularities.java index df1b291..2015075 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/Granularities.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/Granularities.java @@ -72,9 +72,7 @@ public class Granularities { INSTANCE; @Override public void write(JsonGenerator generator) throws IOException { - generator.writeStartObject(); - generator.writeStringField("type", "all"); - generator.writeEndObject(); + generator.writeObject("all"); } @Nonnull public Type getType() { http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/Granularity.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/Granularity.java b/druid/src/main/java/org/apache/calcite/adapter/druid/Granularity.java index ffedec2..f70fd18 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/Granularity.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/Granularity.java @@ -32,7 +32,7 @@ import javax.annotation.Nonnull; * * @see Granularities */ -public interface Granularity extends DruidQuery.Json { +public interface Granularity extends DruidJson { /** Type of supported periods for granularity. */ enum Type { ALL, http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/NaryOperatorConverter.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/NaryOperatorConverter.java b/druid/src/main/java/org/apache/calcite/adapter/druid/NaryOperatorConverter.java new file mode 100644 index 0000000..961454b --- /dev/null +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/NaryOperatorConverter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.druid; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; + +import com.google.common.base.Preconditions; + +import java.util.List; + +import javax.annotation.Nullable; + +/** + * Converts Calcite n-ary operators to druid expression eg (arg1 Op arg2 Op arg3) + */ +public class NaryOperatorConverter implements DruidSqlOperatorConverter { + private final SqlOperator operator; + private final String druidOperatorName; + + public NaryOperatorConverter(SqlOperator operator, String druidOperatorName) { + this.operator = Preconditions.checkNotNull(operator); + this.druidOperatorName = Preconditions.checkNotNull(druidOperatorName); + } + + @Override public SqlOperator calciteOperator() { + return operator; + } + + @Nullable + @Override public String toDruidExpression(RexNode rexNode, RelDataType rowType, + DruidQuery druidQuery) { + final RexCall call = (RexCall) rexNode; + final List<String> druidExpressions = DruidExpressions.toDruidExpressions( + druidQuery, rowType, + call.getOperands()); + if (druidExpressions == null) { + return null; + } + return DruidExpressions.nAryOperatorCall(druidOperatorName, druidExpressions); + } +} + +// End NaryOperatorConverter.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/SubstringOperatorConversion.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/SubstringOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/SubstringOperatorConversion.java new file mode 100644 index 0000000..d2342f3 --- /dev/null +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/SubstringOperatorConversion.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.druid; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; + +import javax.annotation.Nullable; + +/** + * Converts Calcite SUBSTRING call to Druid Expression when possible + */ +public class SubstringOperatorConversion implements DruidSqlOperatorConverter { + @Override public SqlOperator calciteOperator() { + return SqlStdOperatorTable.SUBSTRING; + } + + @Nullable + @Override public String toDruidExpression(RexNode rexNode, RelDataType rowType, + DruidQuery query) { + final RexCall call = (RexCall) rexNode; + final String arg = DruidExpressions.toDruidExpression( + call.getOperands().get(0), rowType, query); + if (arg == null) { + return null; + } + + final int index = RexLiteral.intValue(call.getOperands().get(1)) - 1; + // SQL is 1-indexed, Druid is 0-indexed. + final int length; + if (call.getOperands().size() > 2) { + //case substring from index with length + length = RexLiteral.intValue(call.getOperands().get(2)); + } else { + //case substring from index to the end + length = -1; + } + return DruidQuery.format("substring(%s, %s, %s)", + arg, + DruidExpressions.numberLiteral(index), + DruidExpressions.numberLiteral(length)); + } +} + +// End SubstringOperatorConversion.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java deleted file mode 100644 index 7ef19a6..0000000 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.adapter.druid; - -/** - * DimensionSpec implementation that uses a time format extraction function. - */ -public class TimeExtractionDimensionSpec extends ExtractionDimensionSpec { - - public TimeExtractionDimensionSpec( - ExtractionFunction extractionFunction, String outputName) { - super(DruidTable.DEFAULT_TIMESTAMP_COLUMN, extractionFunction, outputName); - } - - /** - * Creates a time extraction DimensionSpec that renames the '__time' column - * to the given name. - * - * @param outputName name of the output column - * - * @return the time extraction DimensionSpec instance - */ - public static TimeExtractionDimensionSpec makeFullTimeExtract( - String outputName, String timeZone) { - return new TimeExtractionDimensionSpec( - TimeExtractionFunction.createDefault(timeZone), outputName); - } - - /** - * Creates a time extraction DimensionSpec that formats the '__time' column - * according to the given granularity and outputs the column with the given - * name. See {@link TimeExtractionFunction#VALID_TIME_EXTRACT} for set of valid extract - * - * @param granularity granularity to apply to the column - * @param outputName name of the output column - * - * @return time field extraction DimensionSpec instance or null if granularity - * is not supported - */ - public static TimeExtractionDimensionSpec makeTimeExtract( - Granularity granularity, String outputName, String timeZone) { - return new TimeExtractionDimensionSpec( - TimeExtractionFunction.createExtractFromGranularity(granularity, timeZone), outputName); - } - - /** - * Creates floor time extraction dimension spec from Granularity with a given output name - * @param granularity granularity to apply to the time column - * @param outputName name of the output column - * - * @return floor time extraction DimensionSpec instance. - */ - public static TimeExtractionDimensionSpec makeTimeFloor(Granularity granularity, - String outputName, String timeZone) { - ExtractionFunction fn = - TimeExtractionFunction.createFloorFromGranularity(granularity, timeZone); - return new TimeExtractionDimensionSpec(fn, outputName); - } -} - -// End TimeExtractionDimensionSpec.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java index 61e72e0..5b0265e 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java @@ -22,12 +22,18 @@ import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; + import com.fasterxml.jackson.core.JsonGenerator; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import java.io.IOException; import java.util.Locale; +import java.util.TimeZone; + +import javax.annotation.Nullable; import static org.apache.calcite.adapter.druid.DruidQuery.writeFieldIf; @@ -51,6 +57,15 @@ public class TimeExtractionFunction implements ExtractionFunction { TimeUnitRange.MINUTE, TimeUnitRange.SECOND); + private static final ImmutableSet<TimeUnitRange> VALID_TIME_FLOOR = Sets.immutableEnumSet( + TimeUnitRange.YEAR, + TimeUnitRange.MONTH, + TimeUnitRange.DAY, + TimeUnitRange.WEEK, + TimeUnitRange.HOUR, + TimeUnitRange.MINUTE, + TimeUnitRange.SECOND); + public static final String ISO_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; private final String format; @@ -76,6 +91,14 @@ public class TimeExtractionFunction implements ExtractionFunction { generator.writeEndObject(); } + public String getFormat() { + return format; + } + public Granularity getGranularity() { + return granularity; + } + + /** * Creates the default time format extraction function. * @@ -94,7 +117,7 @@ public class TimeExtractionFunction implements ExtractionFunction { */ public static TimeExtractionFunction createExtractFromGranularity( Granularity granularity, String timeZone) { - final String local = Locale.ROOT.toLanguageTag(); + final String local = Locale.US.toLanguageTag(); switch (granularity.getType()) { case DAY: return new TimeExtractionFunction("d", null, timeZone, local); @@ -135,11 +158,12 @@ public class TimeExtractionFunction implements ExtractionFunction { * * @return true if the extract unit is valid */ + public static boolean isValidTimeExtract(RexNode rexNode) { - if (rexNode.getKind() != SqlKind.EXTRACT) { + final RexCall call = (RexCall) rexNode; + if (call.getKind() != SqlKind.EXTRACT || call.getOperands().size() != 2) { return false; } - final RexCall call = (RexCall) rexNode; final RexLiteral flag = (RexLiteral) call.operands.get(0); final TimeUnitRange timeUnit = (TimeUnitRange) flag.getValue(); return timeUnit != null && VALID_TIME_EXTRACT.contains(timeUnit); @@ -163,7 +187,38 @@ public class TimeExtractionFunction implements ExtractionFunction { } final RexLiteral flag = (RexLiteral) call.operands.get(1); final TimeUnitRange timeUnit = (TimeUnitRange) flag.getValue(); - return timeUnit != null && VALID_TIME_EXTRACT.contains(timeUnit); + return timeUnit != null && VALID_TIME_FLOOR.contains(timeUnit); + } + + /** + * @param rexNode cast RexNode + * @param timeZone timezone + * + * @return Druid Time extraction function or null when can not translate the cast. + */ + @Nullable + public static TimeExtractionFunction translateCastToTimeExtract(RexNode rexNode, + TimeZone timeZone) { + assert rexNode.getKind() == SqlKind.CAST; + final RexCall rexCall = (RexCall) rexNode; + final String castFormat = DruidSqlCastConverter + .dateTimeFormatString(rexCall.getType().getSqlTypeName()); + final String timeZoneId = timeZone == null ? null : timeZone.getID(); + if (castFormat == null) { + // unknown format + return null; + } + if (rexCall.getType().getFamily() == SqlTypeFamily.DATE) { + return new TimeExtractionFunction(castFormat, + Granularities.createGranularity(TimeUnitRange.DAY, timeZoneId), timeZoneId, + Locale.ENGLISH.toString()); + } + if (rexCall.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP + || rexCall.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) { + return new TimeExtractionFunction(castFormat, null, timeZoneId, Locale.ENGLISH.toString()); + } + + return null; } } http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/UnaryPrefixOperatorConversion.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/UnaryPrefixOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/UnaryPrefixOperatorConversion.java new file mode 100644 index 0000000..a8e5da3 --- /dev/null +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/UnaryPrefixOperatorConversion.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.druid; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; + +import com.google.common.collect.Iterables; + +import java.util.List; + +/** + * Unary prefix Operator conversion class used to convert expression like Unary NOT and Minus + */ +public class UnaryPrefixOperatorConversion implements DruidSqlOperatorConverter { + + private final SqlOperator operator; + private final String druidOperator; + + public UnaryPrefixOperatorConversion(final SqlOperator operator, final String druidOperator) { + this.operator = operator; + this.druidOperator = druidOperator; + } + + @Override public SqlOperator calciteOperator() { + return operator; + } + + @Override public String toDruidExpression(RexNode rexNode, RelDataType rowType, + DruidQuery druidQuery) { + + final RexCall call = (RexCall) rexNode; + + final List<String> druidExpressions = DruidExpressions.toDruidExpressions( + druidQuery, rowType, + call.getOperands()); + + if (druidExpressions == null) { + return null; + } + + return DruidQuery + .format("(%s %s)", druidOperator, Iterables.getOnlyElement(druidExpressions)); + } +} + +// End UnaryPrefixOperatorConversion.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/UnarySuffixOperatorConversion.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/UnarySuffixOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/UnarySuffixOperatorConversion.java new file mode 100644 index 0000000..015415f --- /dev/null +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/UnarySuffixOperatorConversion.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.druid; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; + +import com.google.common.collect.Iterables; + +import java.util.List; + +/** + * Unary suffix operator conversion, used to convert function like: expression Unary_Operator + */ +public class UnarySuffixOperatorConversion implements DruidSqlOperatorConverter { + private final SqlOperator operator; + private final String druidOperator; + + public UnarySuffixOperatorConversion(SqlOperator operator, String druidOperator) { + this.operator = operator; + this.druidOperator = druidOperator; + } + + @Override public SqlOperator calciteOperator() { + return operator; + } + + @Override public String toDruidExpression(RexNode rexNode, RelDataType rowType, + DruidQuery druidQuery) { + final RexCall call = (RexCall) rexNode; + + final List<String> druidExpressions = DruidExpressions.toDruidExpressions( + druidQuery, rowType, + call.getOperands()); + + if (druidExpressions == null) { + return null; + } + + return DruidQuery.format( + "(%s %s)", + Iterables.getOnlyElement(druidExpressions), druidOperator); + } +} + +// End UnarySuffixOperatorConversion.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/VirtualColumn.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/VirtualColumn.java b/druid/src/main/java/org/apache/calcite/adapter/druid/VirtualColumn.java new file mode 100644 index 0000000..7348cec --- /dev/null +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/VirtualColumn.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.druid; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.util.Locale; + +import static org.apache.calcite.adapter.druid.DruidQuery.writeFieldIf; + +/** + * Druid Json Expression based Virtual Column. + * Virtual columns is used as "projection" concept throughout Druid using expression. + */ +public class VirtualColumn implements DruidJson { + private final String name; + + private final String expression; + + private final DruidType outputType; + + public VirtualColumn(String name, String expression, DruidType outputType) { + this.name = Preconditions.checkNotNull(name); + this.expression = Preconditions.checkNotNull(expression); + this.outputType = outputType == null ? DruidType.FLOAT : outputType; + } + + @Override public void write(JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeStringField("type", "expression"); + generator.writeStringField("name", name); + generator.writeStringField("expression", expression); + writeFieldIf(generator, "outputType", getOutputType().toString().toUpperCase(Locale.ENGLISH)); + generator.writeEndObject(); + } + + public String getName() { + return name; + } + + public String getExpression() { + return expression; + } + + public DruidType getOutputType() { + return outputType; + } + + /** + * Virtual Column Builder + */ + public static class Builder { + private String name; + + private String expression; + + private DruidType type; + + public Builder withName(String name) { + this.name = name; + return this; + } + + public Builder withExpression(String expression) { + this.expression = expression; + return this; + } + + public Builder withType(DruidType type) { + this.type = type; + return this; + } + + public VirtualColumn build() { + return new VirtualColumn(name, expression, type); + } + } + + public static Builder builder() { + return new Builder(); + } +} + +// End VirtualColumn.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/test/java/org/apache/calcite/adapter/druid/DruidQueryFilterTest.java ---------------------------------------------------------------------- diff --git a/druid/src/test/java/org/apache/calcite/adapter/druid/DruidQueryFilterTest.java b/druid/src/test/java/org/apache/calcite/adapter/druid/DruidQueryFilterTest.java index e8e42be..16e1f59 100644 --- a/druid/src/test/java/org/apache/calcite/adapter/druid/DruidQueryFilterTest.java +++ b/druid/src/test/java/org/apache/calcite/adapter/druid/DruidQueryFilterTest.java @@ -16,6 +16,7 @@ */ package org.apache.calcite.adapter.druid; +import org.apache.calcite.config.CalciteConnectionConfig; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeSystem; @@ -30,13 +31,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import java.io.IOException; import java.io.StringWriter; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.math.BigDecimal; import java.util.List; @@ -47,8 +47,22 @@ import static org.hamcrest.core.Is.is; */ public class DruidQueryFilterTest { - @Test public void testInFilter() throws NoSuchMethodException, - InvocationTargetException, IllegalAccessException, IOException { + private DruidQuery druidQuery; + @Before + public void testSetup() { + druidQuery = Mockito.mock(DruidQuery.class); + final CalciteConnectionConfig connectionConfigMock = Mockito + .mock(CalciteConnectionConfig.class); + Mockito.when(connectionConfigMock.timeZone()).thenReturn("UTC"); + Mockito.when(druidQuery.getConnectionConfig()).thenReturn(connectionConfigMock); + Mockito.when(druidQuery.getDruidTable()) + .thenReturn( + new DruidTable(Mockito.mock(DruidSchema.class), "dataSource", null, + ImmutableSet.<String>of(), "timestamp", null, null, + null + )); + } + @Test public void testInFilter() throws IOException { final Fixture f = new Fixture(); final List<? extends RexNode> listRexNodes = ImmutableList.of(f.rexBuilder.makeInputRef(f.varcharRowType, 0), @@ -58,13 +72,9 @@ public class DruidQueryFilterTest { RexNode inRexNode = f.rexBuilder.makeCall(SqlStdOperatorTable.IN, listRexNodes); - Method translateFilter = - DruidQuery.Translator.class.getDeclaredMethod("translateFilter", - RexNode.class); - translateFilter.setAccessible(true); - DruidQuery.JsonInFilter returnValue = - (DruidQuery.JsonInFilter) translateFilter.invoke(f.translatorStringKind, - inRexNode); + DruidJsonFilter returnValue = DruidJsonFilter + .toDruidFilters(inRexNode, f.varcharRowType, druidQuery); + Assert.assertNotNull("Filter is null", returnValue); JsonFactory jsonFactory = new JsonFactory(); final StringWriter sw = new StringWriter(); JsonGenerator jsonGenerator = jsonFactory.createGenerator(sw); @@ -76,8 +86,7 @@ public class DruidQueryFilterTest { + "\"values\":[\"1\",\"5\",\"value1\"]}")); } - @Test public void testBetweenFilterStringCase() throws NoSuchMethodException, - InvocationTargetException, IllegalAccessException, IOException { + @Test public void testBetweenFilterStringCase() throws IOException { final Fixture f = new Fixture(); final List<RexNode> listRexNodes = ImmutableList.of(f.rexBuilder.makeLiteral(false), @@ -88,13 +97,9 @@ public class DruidQueryFilterTest { RexNode betweenRexNode = f.rexBuilder.makeCall(relDataType, SqlStdOperatorTable.BETWEEN, listRexNodes); - Method translateFilter = - DruidQuery.Translator.class.getDeclaredMethod("translateFilter", - RexNode.class); - translateFilter.setAccessible(true); - DruidQuery.JsonBound returnValue = - (DruidQuery.JsonBound) translateFilter.invoke(f.translatorStringKind, - betweenRexNode); + DruidJsonFilter returnValue = DruidJsonFilter + .toDruidFilters(betweenRexNode, f.varcharRowType, druidQuery); + Assert.assertNotNull("Filter is null", returnValue); JsonFactory jsonFactory = new JsonFactory(); final StringWriter sw = new StringWriter(); JsonGenerator jsonGenerator = jsonFactory.createGenerator(sw); @@ -120,8 +125,6 @@ public class DruidQueryFilterTest { final RelDataType varcharRowType = typeFactory.builder() .add("dimensionName", varcharType) .build(); - final DruidQuery.Translator translatorStringKind = - new DruidQuery.Translator(druidTable, varcharRowType, "UTC"); } }
