http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQuery.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQuery.java deleted file mode 100644 index 43982aa..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQuery.java +++ /dev/null @@ -1,1053 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.optimizer.calcite.druid; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.regex.Pattern; - -import org.apache.calcite.linq4j.Ord; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptCost; -import org.apache.calcite.plan.RelOptPlanner; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelFieldCollation; -import org.apache.calcite.rel.RelFieldCollation.Direction; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelWriter; -import org.apache.calcite.rel.core.Aggregate; -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.Sort; -import org.apache.calcite.rel.core.TableScan; -import org.apache.calcite.rel.metadata.RelMetadataQuery; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexUtil; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.tools.RelBuilder; -import org.apache.calcite.util.ImmutableBitSet; -import org.apache.calcite.util.Litmus; -import org.apache.calcite.util.Pair; -import org.apache.calcite.util.Util; -import org.apache.hadoop.hive.conf.Constants; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveDateGranularity; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; -import org.joda.time.Interval; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; - -/** - * Relational expression representing a scan of a Druid data set. - * - * TODO: to be removed when Calcite is upgraded to 1.9 - */ -public class DruidQuery extends TableScan { - - protected static final Logger LOG = LoggerFactory.getLogger(DruidQuery.class); - - protected QuerySpec querySpec; - - final DruidTable druidTable; - final List<Interval> intervals; - final ImmutableList<RelNode> rels; - - private static final Pattern VALID_SIG = Pattern.compile("sf?p?a?l?"); - - /** - * Creates a DruidQuery. - * - * @param cluster Cluster - * @param traitSet Traits - * @param table Table - * @param druidTable Druid table - * @param interval Interval for the query - * @param rels Internal relational expressions - */ - private DruidQuery(RelOptCluster cluster, RelTraitSet traitSet, - RelOptTable table, DruidTable druidTable, - List<Interval> intervals, List<RelNode> rels) { - super(cluster, traitSet, table); - this.druidTable = druidTable; - this.intervals = ImmutableList.copyOf(intervals); - this.rels = ImmutableList.copyOf(rels); - - assert isValid(Litmus.THROW); - } - - /** Returns a string describing the operations inside this query. - * - * <p>For example, "sfpal" means {@link TableScan} (s) - * followed by {@link Filter} (f) - * followed by {@link Project} (p) - * followed by {@link Aggregate} (a) - * followed by {@link Sort} (l). - * - * @see #isValidSignature(String) - */ - String signature() { - final StringBuilder b = new StringBuilder(); - for (RelNode rel : rels) { - b.append(rel instanceof TableScan ? 's' - : rel instanceof Project ? 'p' - : rel instanceof Filter ? 'f' - : rel instanceof Aggregate ? 'a' - : rel instanceof Sort ? 'l' - : '!'); - } - return b.toString(); - } - - @Override public boolean isValid(Litmus litmus) { - if (!super.isValid(litmus)) { - return false; - } - final String signature = signature(); - if (!isValidSignature(signature)) { - return litmus.fail("invalid signature"); - } - if (rels.isEmpty()) { - return litmus.fail("must have at least one rel"); - } - for (int i = 0; i < rels.size(); i++) { - final RelNode r = rels.get(i); - if (i == 0) { - if (!(r instanceof TableScan)) { - return litmus.fail("first rel must be TableScan"); - } - if (r.getTable() != table) { - return litmus.fail("first rel must be based on table table"); - } - } else { - final List<RelNode> inputs = r.getInputs(); - if (inputs.size() != 1 || inputs.get(0) != rels.get(i - 1)) { - return litmus.fail("each rel must have a single input"); - } - if (r instanceof Aggregate) { - final Aggregate aggregate = (Aggregate) r; - if (aggregate.getGroupSets().size() != 1 - || aggregate.indicator) { - return litmus.fail("no grouping sets"); - } - for (AggregateCall call : aggregate.getAggCallList()) { - if (call.filterArg >= 0) { - return litmus.fail("no filtered aggregate functions"); - } - } - } - if (r instanceof Filter) { - final Filter filter = (Filter) r; - if (!isValidFilter(filter.getCondition())) { - return litmus.fail("invalid filter"); - } - } - if (r instanceof Sort) { - final Sort sort = (Sort) r; - if (sort.offset != null && RexLiteral.intValue(sort.offset) != 0) { - return litmus.fail("offset not supported"); - } - } - } - } - return true; - } - - boolean isValidFilter(RexNode e) { - switch (e.getKind()) { - case INPUT_REF: - case LITERAL: - return true; - case AND: - case OR: - case NOT: - case EQUALS: - case LESS_THAN: - case LESS_THAN_OR_EQUAL: - case GREATER_THAN: - case GREATER_THAN_OR_EQUAL: - case BETWEEN: - case IN: - case CAST: - return areValidFilters(((RexCall) e).getOperands()); - default: - return false; - } - } - - private boolean areValidFilters(List<RexNode> es) { - for (RexNode e : es) { - if (!isValidFilter(e)) { - return false; - } - } - return true; - } - - /** Returns whether a signature represents an sequence of relational operators - * that can be translated into a valid Druid query. */ - static boolean isValidSignature(String signature) { - return VALID_SIG.matcher(signature).matches(); - } - - /** Creates a DruidQuery. */ - public static DruidQuery create(RelOptCluster cluster, RelTraitSet traitSet, - RelOptTable table, DruidTable druidTable, List<RelNode> rels) { - return new DruidQuery(cluster, traitSet, table, druidTable, druidTable.intervals, rels); - } - - /** Creates a DruidQuery. */ - private static DruidQuery create(RelOptCluster cluster, RelTraitSet traitSet, - RelOptTable table, DruidTable druidTable, List<Interval> intervals, List<RelNode> rels) { - return new DruidQuery(cluster, traitSet, table, druidTable, intervals, rels); - } - - /** Extends a DruidQuery. */ - public static DruidQuery extendQuery(DruidQuery query, RelNode r) { - final ImmutableList.Builder<RelNode> builder = ImmutableList.builder(); - return DruidQuery.create(query.getCluster(), query.getTraitSet(), query.getTable(), - query.druidTable, query.intervals, builder.addAll(query.rels).add(r).build()); - } - - /** Extends a DruidQuery. */ - public static DruidQuery extendQuery(DruidQuery query, List<Interval> intervals) { - return DruidQuery.create(query.getCluster(), query.getTraitSet(), query.getTable(), - query.druidTable, intervals, query.rels); - } - - @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { - assert inputs.isEmpty(); - return this; - } - - @Override public RelDataType deriveRowType() { - return getCluster().getTypeFactory().createStructType( - Pair.right(Util.last(rels).getRowType().getFieldList()), - getQuerySpec().fieldNames); - } - - public TableScan getTableScan() { - return (TableScan) rels.get(0); - } - - public RelNode getTopNode() { - return Util.last(rels); - } - - @Override public RelOptTable getTable() { - return table; - } - - @Override public RelWriter explainTerms(RelWriter pw) { - for (RelNode rel : rels) { - if (rel instanceof TableScan) { - TableScan tableScan = (TableScan) rel; - pw.item("table", tableScan.getTable().getQualifiedName()); - pw.item("intervals", intervals); - } else if (rel instanceof Filter) { - pw.item("filter", ((Filter) rel).getCondition()); - } else if (rel instanceof Project) { - pw.item("projects", ((Project) rel).getProjects()); - } else if (rel instanceof Aggregate) { - final Aggregate aggregate = (Aggregate) rel; - pw.item("groups", aggregate.getGroupSet()) - .item("aggs", aggregate.getAggCallList()); - } else if (rel instanceof Sort) { - final Sort sort = (Sort) rel; - for (Ord<RelFieldCollation> ord - : Ord.zip(sort.collation.getFieldCollations())) { - pw.item("sort" + ord.i, ord.e.getFieldIndex()); - } - for (Ord<RelFieldCollation> ord - : Ord.zip(sort.collation.getFieldCollations())) { - pw.item("dir" + ord.i, ord.e.shortString()); - } - pw.itemIf("fetch", sort.fetch, sort.fetch != null); - } else { - throw new AssertionError("rel type not supported in Druid query " - + rel); - } - } - return pw; - } - - @Override public RelOptCost computeSelfCost(RelOptPlanner planner, - RelMetadataQuery mq) { - // Heuristic: we assume pushing query to Druid reduces cost by 90% - return Util.last(rels).computeSelfCost(planner, mq).multiplyBy(.1); - } - - @Override public RelNode project(ImmutableBitSet fieldsUsed, - Set<RelDataTypeField> extraFields, - RelBuilder relBuilder) { - final int fieldCount = getRowType().getFieldCount(); - if (fieldsUsed.equals(ImmutableBitSet.range(fieldCount)) - && extraFields.isEmpty()) { - return this; - } - final List<RexNode> exprList = new ArrayList<>(); - final List<String> nameList = new ArrayList<>(); - final RexBuilder rexBuilder = getCluster().getRexBuilder(); - final List<RelDataTypeField> fields = getRowType().getFieldList(); - - // Project the subset of fields. - for (int i : fieldsUsed) { - RelDataTypeField field = fields.get(i); - exprList.add(rexBuilder.makeInputRef(this, i)); - nameList.add(field.getName()); - } - - // Project nulls for the extra fields. (Maybe a sub-class table has - // extra fields, but we don't.) - for (RelDataTypeField extraField : extraFields) { - exprList.add( - rexBuilder.ensureType( - extraField.getType(), - rexBuilder.constantNull(), - true)); - nameList.add(extraField.getName()); - } - - HiveProject hp = (HiveProject) relBuilder.push(this).project(exprList, nameList).build(); - hp.setSynthetic(); - return hp; - } - - public QuerySpec getQuerySpec() { - if (querySpec == null) { - querySpec = deriveQuerySpec(); - assert querySpec != null : this; - } - return querySpec; - } - - protected QuerySpec deriveQuerySpec() { - final RelDataType rowType = table.getRowType(); - int i = 1; - - RexNode filter = null; - if (i < rels.size() && rels.get(i) instanceof Filter) { - final Filter filterRel = (Filter) rels.get(i++); - filter = filterRel.getCondition(); - } - - List<RexNode> projects = null; - if (i < rels.size() && rels.get(i) instanceof Project) { - final Project project = (Project) rels.get(i++); - projects = project.getProjects(); - } - - ImmutableBitSet groupSet = null; - List<AggregateCall> aggCalls = null; - List<String> aggNames = null; - if (i < rels.size() && rels.get(i) instanceof Aggregate) { - final Aggregate aggregate = (Aggregate) rels.get(i++); - groupSet = aggregate.getGroupSet(); - aggCalls = aggregate.getAggCallList(); - aggNames = Util.skip(aggregate.getRowType().getFieldNames(), - groupSet.cardinality()); - } - - List<Integer> collationIndexes = null; - List<Direction> collationDirections = null; - Integer fetch = null; - if (i < rels.size() && rels.get(i) instanceof Sort) { - final Sort sort = (Sort) rels.get(i++); - collationIndexes = new ArrayList<>(); - collationDirections = new ArrayList<>(); - for (RelFieldCollation fCol: sort.collation.getFieldCollations()) { - collationIndexes.add(fCol.getFieldIndex()); - collationDirections.add(fCol.getDirection()); - } - fetch = sort.fetch != null ? RexLiteral.intValue(sort.fetch) : null; - } - - if (i != rels.size()) { - throw new AssertionError("could not implement all rels"); - } - - return getQuery(rowType, filter, projects, groupSet, aggCalls, aggNames, - collationIndexes, collationDirections, fetch); - } - - public String getQueryType() { - return getQuerySpec().queryType.getQueryName(); - } - - public String getQueryString() { - return getQuerySpec().queryString; - } - - private QuerySpec getQuery(RelDataType rowType, RexNode filter, List<RexNode> projects, - ImmutableBitSet groupSet, List<AggregateCall> aggCalls, List<String> aggNames, - List<Integer> collationIndexes, List<Direction> collationDirections, Integer fetch) { - DruidQueryType queryType = DruidQueryType.SELECT; - final Translator translator = new Translator(druidTable, rowType); - List<String> fieldNames = rowType.getFieldNames(); - - // Handle filter - Json jsonFilter = null; - if (filter != null) { - jsonFilter = translator.translateFilter(filter); - } - - // Then we handle project - if (projects != null) { - translator.metrics.clear(); - translator.dimensions.clear(); - final ImmutableList.Builder<String> builder = ImmutableList.builder(); - for (RexNode project : projects) { - builder.add(translator.translate(project, true)); - } - fieldNames = builder.build(); - } - - // Finally we handle aggregate and sort. Handling of these - // operators is more complex, since we need to extract - // the conditions to know whether the query will be - // executed as a Timeseries, TopN, or GroupBy in Druid - final List<String> dimensions = new ArrayList<>(); - final List<JsonAggregation> aggregations = new ArrayList<>(); - String granularity = "ALL"; - Direction timeSeriesDirection = null; - JsonLimit limit = null; - if (groupSet != null) { - assert aggCalls != null; - assert aggNames != null; - assert aggCalls.size() == aggNames.size(); - - int timePositionIdx = -1; - final ImmutableList.Builder<String> builder = ImmutableList.builder(); - if (projects != null) { - for (int groupKey : groupSet) { - final String s = fieldNames.get(groupKey); - final RexNode project = projects.get(groupKey); - if (project instanceof RexInputRef) { - // Reference, it could be to the timestamp column or any other dimension - final RexInputRef ref = (RexInputRef) project; - final String origin = druidTable.rowType.getFieldList().get(ref.getIndex()).getName(); - if (origin.equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) { - granularity = "NONE"; - builder.add(s); - assert timePositionIdx == -1; - timePositionIdx = groupKey; - } else { - dimensions.add(s); - builder.add(s); - } - } else if (project instanceof RexCall) { - // Call, check if we should infer granularity - RexCall call = (RexCall) project; - if (HiveDateGranularity.ALL_FUNCTIONS.contains(call.getOperator())) { - granularity = call.getOperator().getName(); - builder.add(s); - assert timePositionIdx == -1; - timePositionIdx = groupKey; - } else { - dimensions.add(s); - builder.add(s); - } - } else { - throw new AssertionError("incompatible project expression: " + project); - } - } - } else { - for (int groupKey : groupSet) { - final String s = fieldNames.get(groupKey); - if (s.equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) { - granularity = "NONE"; - builder.add(s); - assert timePositionIdx == -1; - timePositionIdx = groupKey; - } else { - dimensions.add(s); - builder.add(s); - } - } - } - - for (Pair<AggregateCall, String> agg : Pair.zip(aggCalls, aggNames)) { - final JsonAggregation jsonAggregation = - getJsonAggregation(fieldNames, agg.right, agg.left); - aggregations.add(jsonAggregation); - builder.add(jsonAggregation.name); - } - - fieldNames = builder.build(); - - ImmutableList<JsonCollation> collations = null; - boolean sortsMetric = false; - if (collationIndexes != null) { - assert collationDirections != null; - ImmutableList.Builder<JsonCollation> colBuilder = new ImmutableList.Builder<JsonCollation>(); - for (Pair<Integer,Direction> p : Pair.zip(collationIndexes, collationDirections)) { - colBuilder.add(new JsonCollation(fieldNames.get(p.left), - p.right == Direction.DESCENDING ? "descending" : "ascending")); - if (p.left >= groupSet.cardinality() && p.right == Direction.DESCENDING) { - // Currently only support for DESC in TopN - sortsMetric = true; - } else if (p.left == timePositionIdx) { - assert timeSeriesDirection == null; - timeSeriesDirection = p.right; - } - } - collations = colBuilder.build(); - } - - limit = new JsonLimit("default", fetch, collations); - - if (dimensions.isEmpty() && (collations == null || timeSeriesDirection != null)) { - queryType = DruidQueryType.TIMESERIES; - assert fetch == null; - } else if (dimensions.size() == 1 && sortsMetric && collations.size() == 1 && fetch != null) { - queryType = DruidQueryType.TOP_N; - } else { - queryType = DruidQueryType.GROUP_BY; - } - } else { - assert aggCalls == null; - assert aggNames == null; - assert collationIndexes == null || collationIndexes.isEmpty(); - assert collationDirections == null || collationDirections.isEmpty(); - } - - final StringWriter sw = new StringWriter(); - final JsonFactory factory = new JsonFactory(); - try { - final JsonGenerator generator = factory.createGenerator(sw); - - switch (queryType) { - case TIMESERIES: - generator.writeStartObject(); - - generator.writeStringField("queryType", "timeseries"); - generator.writeStringField("dataSource", druidTable.dataSource); - generator.writeStringField("descending", timeSeriesDirection != null && - timeSeriesDirection == Direction.DESCENDING ? "true" : "false"); - generator.writeStringField("granularity", granularity); - writeFieldIf(generator, "filter", jsonFilter); - writeField(generator, "aggregations", aggregations); - writeFieldIf(generator, "postAggregations", null); - writeField(generator, "intervals", intervals); - - generator.writeEndObject(); - break; - - case TOP_N: - generator.writeStartObject(); - - generator.writeStringField("queryType", "topN"); - generator.writeStringField("dataSource", druidTable.dataSource); - generator.writeStringField("granularity", granularity); - generator.writeStringField("dimension", dimensions.get(0)); - generator.writeStringField("metric", fieldNames.get(collationIndexes.get(0))); - writeFieldIf(generator, "filter", jsonFilter); - writeField(generator, "aggregations", aggregations); - writeFieldIf(generator, "postAggregations", null); - writeField(generator, "intervals", intervals); - generator.writeNumberField("threshold", fetch); - - generator.writeEndObject(); - break; - - case GROUP_BY: - generator.writeStartObject(); - - if (aggregations.isEmpty()) { - // Druid requires at least one aggregation, otherwise gives: - // Must have at least one AggregatorFactory - aggregations.add( - new JsonAggregation("longSum", "dummy_agg", "dummy_agg")); - } - - generator.writeStringField("queryType", "groupBy"); - generator.writeStringField("dataSource", druidTable.dataSource); - generator.writeStringField("granularity", granularity); - writeField(generator, "dimensions", dimensions); - writeFieldIf(generator, "limitSpec", limit); - writeFieldIf(generator, "filter", jsonFilter); - writeField(generator, "aggregations", aggregations); - writeFieldIf(generator, "postAggregations", null); - writeField(generator, "intervals", intervals); - writeFieldIf(generator, "having", null); - - generator.writeEndObject(); - break; - - case SELECT: - generator.writeStartObject(); - - generator.writeStringField("queryType", "select"); - generator.writeStringField("dataSource", druidTable.dataSource); - generator.writeStringField("descending", "false"); - writeField(generator, "intervals", intervals); - writeFieldIf(generator, "filter", jsonFilter); - writeField(generator, "dimensions", translator.dimensions); - writeField(generator, "metrics", translator.metrics); - generator.writeStringField("granularity", granularity); - - generator.writeFieldName("pagingSpec"); - generator.writeStartObject(); - generator.writeNumberField("threshold", fetch != null ? fetch : 1); - generator.writeEndObject(); - - generator.writeFieldName("context"); - generator.writeStartObject(); - generator.writeBooleanField(Constants.DRUID_QUERY_FETCH, fetch != null); - generator.writeEndObject(); - - generator.writeEndObject(); - break; - - default: - throw new AssertionError("unknown query type " + queryType); - } - - generator.close(); - } catch (IOException e) { - e.printStackTrace(); - } - - return new QuerySpec(queryType, sw.toString(), fieldNames); - } - - private JsonAggregation getJsonAggregation(List<String> fieldNames, - String name, AggregateCall aggCall) { - final List<String> list = new ArrayList<>(); - for (Integer arg : aggCall.getArgList()) { - list.add(fieldNames.get(arg)); - } - final String only = Iterables.getFirst(list, null); - final boolean b = aggCall.getType().getSqlTypeName() == SqlTypeName.DOUBLE; - switch (aggCall.getAggregation().getKind()) { - case COUNT: - if (aggCall.isDistinct()) { - return new JsonCardinalityAggregation("cardinality", name, list); - } - return new JsonAggregation("count", name, only); - case SUM: - case SUM0: - return new JsonAggregation(b ? "doubleSum" : "longSum", name, only); - case MIN: - return new JsonAggregation(b ? "doubleMin" : "longMin", name, only); - case MAX: - return new JsonAggregation(b ? "doubleMax" : "longMax", name, only); - default: - throw new AssertionError("unknown aggregate " + aggCall); - } - } - - private static void writeField(JsonGenerator generator, String fieldName, - Object o) throws IOException { - generator.writeFieldName(fieldName); - writeObject(generator, o); - } - - private static void writeFieldIf(JsonGenerator generator, String fieldName, - Object o) throws IOException { - if (o != null) { - writeField(generator, fieldName, o); - } - } - - private static void writeArray(JsonGenerator generator, List<?> elements) - throws IOException { - generator.writeStartArray(); - for (Object o : elements) { - writeObject(generator, o); - } - generator.writeEndArray(); - } - - private static void writeObject(JsonGenerator generator, Object o) - throws IOException { - if (o instanceof String) { - String s = (String) o; - generator.writeString(s); - } else if (o instanceof Interval) { - Interval i = (Interval) o; - generator.writeString(i.toString()); - } else if (o instanceof Integer) { - Integer i = (Integer) o; - generator.writeNumber(i); - } else if (o instanceof List) { - writeArray(generator, (List<?>) o); - } else if (o instanceof Json) { - ((Json) o).write(generator); - } else { - throw new AssertionError("not a json object: " + o); - } - } - - /** Druid query specification. */ - public static class QuerySpec { - final DruidQueryType queryType; - final String queryString; - final List<String> fieldNames; - - QuerySpec(DruidQueryType queryType, String queryString, - List<String> fieldNames) { - this.queryType = Preconditions.checkNotNull(queryType); - this.queryString = Preconditions.checkNotNull(queryString); - this.fieldNames = ImmutableList.copyOf(fieldNames); - } - - @Override public int hashCode() { - return Objects.hash(queryType, queryString, fieldNames); - } - - @Override public boolean equals(Object obj) { - return obj == this - || obj instanceof QuerySpec - && queryType == ((QuerySpec) obj).queryType - && queryString.equals(((QuerySpec) obj).queryString) - && fieldNames.equals(((QuerySpec) obj).fieldNames); - } - - @Override public String toString() { - return "{queryType: " + queryType - + ", queryString: " + queryString - + ", fieldNames: " + fieldNames + "}"; - } - - String getQueryString(String pagingIdentifier, int offset) { - if (pagingIdentifier == null) { - return queryString; - } - return queryString.replace("\"threshold\":", - "\"pagingIdentifiers\":{\"" + pagingIdentifier + "\":" + offset - + "},\"threshold\":"); - } - } - - /** Translates scalar expressions to Druid field references. */ - private static class Translator { - final List<String> dimensions = new ArrayList<>(); - final List<String> metrics = new ArrayList<>(); - final DruidTable druidTable; - final RelDataType rowType; - - Translator(DruidTable druidTable, RelDataType rowType) { - this.druidTable = druidTable; - this.rowType = rowType; - for (RelDataTypeField f : rowType.getFieldList()) { - final String fieldName = f.getName(); - if (druidTable.metricFieldNames.contains(fieldName)) { - metrics.add(fieldName); - } else if (!DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(fieldName)) { - dimensions.add(fieldName); - } - } - } - - String translate(RexNode e, boolean set) { - switch (e.getKind()) { - case INPUT_REF: - final RexInputRef ref = (RexInputRef) e; - final String fieldName = - rowType.getFieldList().get(ref.getIndex()).getName(); - if (set) { - if (druidTable.metricFieldNames.contains(fieldName)) { - metrics.add(fieldName); - } else if (!DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(fieldName)) { - dimensions.add(fieldName); - } - } - return fieldName; - - case CAST: - return tr(e, 0, set); - - case LITERAL: - return ((RexLiteral) e).getValue2().toString(); - - case OTHER_FUNCTION: - final RexCall call = (RexCall) e; - assert HiveDateGranularity.ALL_FUNCTIONS.contains(call.getOperator()); - return tr(call, 0, set); - - default: - throw new AssertionError("invalid expression " + e); - } - } - - @SuppressWarnings("incomplete-switch") - private JsonFilter translateFilter(RexNode e) { - RexCall call; - switch (e.getKind()) { - case EQUALS: - case NOT_EQUALS: - case GREATER_THAN: - case GREATER_THAN_OR_EQUAL: - case LESS_THAN: - case LESS_THAN_OR_EQUAL: - call = (RexCall) e; - int posRef; - int posConstant; - if (RexUtil.isConstant(call.getOperands().get(1))) { - posRef = 0; - posConstant = 1; - } else if (RexUtil.isConstant(call.getOperands().get(0))) { - posRef = 1; - posConstant = 0; - } else { - throw new AssertionError("it is not a valid comparison: " + e); - } - switch (e.getKind()) { - case EQUALS: - return new JsonSelector("selector", tr(e, posRef), tr(e, posConstant)); - case NOT_EQUALS: - return new JsonCompositeFilter("not", - ImmutableList.of(new JsonSelector("selector", tr(e, posRef), tr(e, posConstant)))); - case GREATER_THAN: - return new JsonBound("bound", tr(e, posRef), tr(e, posConstant), true, null, false, - false); - case GREATER_THAN_OR_EQUAL: - return new JsonBound("bound", tr(e, posRef), tr(e, posConstant), false, null, false, - false); - case LESS_THAN: - return new JsonBound("bound", tr(e, posRef), null, false, tr(e, posConstant), true, - false); - case LESS_THAN_OR_EQUAL: - return new JsonBound("bound", tr(e, posRef), null, false, tr(e, posConstant), false, - false); - } - case AND: - case OR: - case NOT: - call = (RexCall) e; - return new JsonCompositeFilter(e.getKind().toString().toLowerCase(), - translateFilters(call.getOperands())); - default: - throw new AssertionError("cannot translate filter: " + e); - } - } - - private String tr(RexNode call, int index) { - return tr(call, index, false); - } - - private String tr(RexNode call, int index, boolean set) { - return translate(((RexCall) call).getOperands().get(index), set); - } - - private List<JsonFilter> translateFilters(List<RexNode> operands) { - final ImmutableList.Builder<JsonFilter> builder = - ImmutableList.builder(); - for (RexNode operand : operands) { - builder.add(translateFilter(operand)); - } - return builder.build(); - } - } - - /** Object that knows how to write itself to a - * {@link com.fasterxml.jackson.core.JsonGenerator}. */ - private interface Json { - void write(JsonGenerator generator) throws IOException; - } - - /** Aggregation element of a Druid "groupBy" or "topN" query. */ - private static class JsonAggregation implements Json { - final String type; - final String name; - final String fieldName; - - private JsonAggregation(String type, String name, String fieldName) { - this.type = type; - this.name = name; - this.fieldName = fieldName; - } - - public void write(JsonGenerator generator) throws IOException { - generator.writeStartObject(); - generator.writeStringField("type", type); - generator.writeStringField("name", name); - writeFieldIf(generator, "fieldName", fieldName); - generator.writeEndObject(); - } - } - - /** Collation element of a Druid "groupBy" query. */ - private static class JsonLimit implements Json { - final String type; - final Integer limit; - final ImmutableList<JsonCollation> collations; - - private JsonLimit(String type, Integer limit, ImmutableList<JsonCollation> collations) { - this.type = type; - this.limit = limit; - this.collations = collations; - } - - public void write(JsonGenerator generator) throws IOException { - generator.writeStartObject(); - generator.writeStringField("type", type); - writeFieldIf(generator, "limit", limit); - writeFieldIf(generator, "columns", collations); - generator.writeEndObject(); - } - } - - /** Collation element of a Druid "groupBy" query. */ - private static class JsonCollation implements Json { - final String dimension; - final String direction; - - private JsonCollation(String dimension, String direction) { - this.dimension = dimension; - this.direction = direction; - } - - public void write(JsonGenerator generator) throws IOException { - generator.writeStartObject(); - generator.writeStringField("dimension", dimension); - writeFieldIf(generator, "direction", direction); - generator.writeEndObject(); - } - } - - /** Aggregation element that calls the "cardinality" function. */ - private static class JsonCardinalityAggregation extends JsonAggregation { - final List<String> fieldNames; - - private JsonCardinalityAggregation(String type, String name, - List<String> fieldNames) { - super(type, name, null); - this.fieldNames = fieldNames; - } - - public void write(JsonGenerator generator) throws IOException { - generator.writeStartObject(); - generator.writeStringField("type", type); - generator.writeStringField("name", name); - writeFieldIf(generator, "fieldNames", fieldNames); - generator.writeEndObject(); - } - } - - /** Filter element of a Druid "groupBy" or "topN" query. */ - private abstract static class JsonFilter implements Json { - final String type; - - private JsonFilter(String type) { - this.type = type; - } - } - - /** Equality filter. */ - private static class JsonSelector extends JsonFilter { - private final String dimension; - private final String value; - - private JsonSelector(String type, String dimension, String value) { - super(type); - this.dimension = dimension; - this.value = value; - } - - public void write(JsonGenerator generator) throws IOException { - generator.writeStartObject(); - generator.writeStringField("type", type); - generator.writeStringField("dimension", dimension); - generator.writeStringField("value", value); - generator.writeEndObject(); - } - } - - /** Bound filter. */ - private static class JsonBound extends JsonFilter { - private final String dimension; - private final String lower; - private final boolean lowerStrict; - private final String upper; - private final boolean upperStrict; - private final boolean alphaNumeric; - - private JsonBound(String type, String dimension, String lower, - boolean lowerStrict, String upper, boolean upperStrict, - boolean alphaNumeric) { - super(type); - this.dimension = dimension; - this.lower = lower; - this.lowerStrict = lowerStrict; - this.upper = upper; - this.upperStrict = upperStrict; - this.alphaNumeric = alphaNumeric; - } - - public void write(JsonGenerator generator) throws IOException { - generator.writeStartObject(); - generator.writeStringField("type", type); - generator.writeStringField("dimension", dimension); - if (lower != null) { - generator.writeStringField("lower", lower); - generator.writeBooleanField("lowerStrict", lowerStrict); - } - if (upper != null) { - generator.writeStringField("upper", upper); - generator.writeBooleanField("upperStrict", upperStrict); - } - generator.writeBooleanField("alphaNumeric", alphaNumeric); - generator.writeEndObject(); - } - } - - /** Filter that combines other filters using a boolean operator. */ - private static class JsonCompositeFilter extends JsonFilter { - private final List<? extends JsonFilter> fields; - - private JsonCompositeFilter(String type, - List<? extends JsonFilter> fields) { - super(type); - this.fields = fields; - } - - public void write(JsonGenerator generator) throws IOException { - generator.writeStartObject(); - generator.writeStringField("type", type); - switch (type) { - case "NOT": - writeField(generator, "field", fields.get(0)); - break; - default: - writeField(generator, "fields", fields); - } - generator.writeEndObject(); - } - } - -} - -// End DruidQuery.java \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQueryType.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQueryType.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQueryType.java deleted file mode 100644 index 228b307..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQueryType.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.optimizer.calcite.druid; - -/** - * Type of Druid query. - * - * TODO: to be removed when Calcite is upgraded to 1.9 - */ -public enum DruidQueryType { - SELECT("select"), - TOP_N("topN"), - GROUP_BY("groupBy"), - TIMESERIES("timeseries"); - - private final String queryName; - - private DruidQueryType(String queryName) { - this.queryName = queryName; - } - - public String getQueryName() { - return this.queryName; - } -} - -// End QueryType.java \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidRules.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidRules.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidRules.java deleted file mode 100644 index f68ffa5..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidRules.java +++ /dev/null @@ -1,591 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.optimizer.calcite.druid; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.calcite.plan.RelOptRule; -import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.rel.RelFieldCollation; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Aggregate; -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.Sort; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexShuttle; -import org.apache.calcite.rex.RexUtil; -import org.apache.calcite.rex.RexVisitorImpl; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.util.ImmutableBitSet; -import org.apache.calcite.util.Pair; -import org.apache.calcite.util.Util; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveDateGranularity; -import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveProjectSortTransposeRule; -import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSortProjectTransposeRule; -import org.joda.time.Interval; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - -/** - * Rules and relational operators for {@link DruidQuery}. - * - * TODO: to be removed when Calcite is upgraded to 1.9 - */ -public class DruidRules { - - protected static final Logger LOG = LoggerFactory.getLogger(DruidRules.class); - - // Avoid instantiation - private DruidRules() { - } - - public static final DruidFilterRule FILTER = new DruidFilterRule(); - public static final DruidProjectRule PROJECT = new DruidProjectRule(); - public static final DruidAggregateRule AGGREGATE = new DruidAggregateRule(); - public static final DruidProjectAggregateRule PROJECT_AGGREGATE = new DruidProjectAggregateRule(); - public static final DruidSortRule SORT = new DruidSortRule(); - public static final DruidProjectSortRule PROJECT_SORT = new DruidProjectSortRule(); - public static final DruidSortProjectRule SORT_PROJECT = new DruidSortProjectRule(); - - /** Predicate that returns whether Druid can not handle an aggregate. */ - private static final Predicate<AggregateCall> BAD_AGG = new Predicate<AggregateCall>() { - public boolean apply(AggregateCall aggregateCall) { - switch (aggregateCall.getAggregation().getKind()) { - case COUNT: - case SUM: - case SUM0: - case MIN: - case MAX: - return false; - default: - return true; - } - } - }; - - /** - * Rule to push a {@link org.apache.calcite.rel.core.Filter} into a {@link DruidQuery}. - */ - private static class DruidFilterRule extends RelOptRule { - private DruidFilterRule() { - super(operand(Filter.class, - operand(DruidQuery.class, none()))); - } - - public void onMatch(RelOptRuleCall call) { - final Filter filter = call.rel(0); - final DruidQuery query = call.rel(1); - if (!DruidQuery.isValidSignature(query.signature() + 'f') - || !query.isValidFilter(filter.getCondition())) { - return; - } - // Timestamp - int timestampFieldIdx = -1; - for (int i = 0; i < query.getRowType().getFieldCount(); i++) { - if (DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals( - query.getRowType().getFieldList().get(i).getName())) { - timestampFieldIdx = i; - break; - } - } - final Pair<List<RexNode>, List<RexNode>> pair = splitFilters( - filter.getCluster().getRexBuilder(), query, filter.getCondition(), timestampFieldIdx); - if (pair == null) { - // We can't push anything useful to Druid. - return; - } - List<Interval> intervals = null; - if (!pair.left.isEmpty()) { - intervals = DruidIntervalUtils.createInterval( - query.getRowType().getFieldList().get(timestampFieldIdx).getType(), - pair.left); - if (intervals == null) { - // We can't push anything useful to Druid. - return; - } - } - DruidQuery newDruidQuery = query; - if (!pair.right.isEmpty()) { - if (!validConditions(pair.right)) { - return; - } - final RelNode newFilter = filter.copy(filter.getTraitSet(), Util.last(query.rels), - RexUtil.composeConjunction(filter.getCluster().getRexBuilder(), pair.right, false)); - newDruidQuery = DruidQuery.extendQuery(query, newFilter); - } - if (intervals != null) { - newDruidQuery = DruidQuery.extendQuery(newDruidQuery, intervals); - } - call.transformTo(newDruidQuery); - } - - /* Splits the filter condition in two groups: those that filter on the timestamp column - * and those that filter on other fields */ - private static Pair<List<RexNode>, List<RexNode>> splitFilters(final RexBuilder rexBuilder, - final DruidQuery input, RexNode cond, final int timestampFieldIdx) { - final List<RexNode> timeRangeNodes = new ArrayList<>(); - final List<RexNode> otherNodes = new ArrayList<>(); - List<RexNode> conjs = RelOptUtil.conjunctions(cond); - if (conjs.isEmpty()) { - // We do not transform - return null; - } - // Number of columns with the dimensions and timestamp - int max = input.getRowType().getFieldCount() - input.druidTable.metricFieldNames.size(); - for (RexNode conj : conjs) { - final RelOptUtil.InputReferencedVisitor visitor = new RelOptUtil.InputReferencedVisitor(); - conj.accept(visitor); - if (visitor.inputPosReferenced.contains(timestampFieldIdx)) { - if (visitor.inputPosReferenced.size() != 1) { - // Complex predicate, transformation currently not supported - return null; - } - timeRangeNodes.add(conj); - } else if (!visitor.inputPosReferenced.tailSet(max).isEmpty()) { - // Filter on metrics, not supported in Druid - return null; - } else { - otherNodes.add(conj); - } - } - return Pair.of(timeRangeNodes, otherNodes); - } - - /* Checks that all conditions are on ref + literal*/ - private static boolean validConditions(List<RexNode> nodes) { - for (RexNode node: nodes) { - try { - node.accept( - new RexVisitorImpl<Void>(true) { - @SuppressWarnings("incomplete-switch") - @Override public Void visitCall(RexCall call) { - switch (call.getKind()) { - case CAST: - // Only if on top of ref or literal - if (call.getOperands().get(0) instanceof RexInputRef || - call.getOperands().get(0) instanceof RexLiteral) { - break; - } - // Not supported - throw Util.FoundOne.NULL; - case EQUALS: - case LESS_THAN: - case LESS_THAN_OR_EQUAL: - case GREATER_THAN: - case GREATER_THAN_OR_EQUAL: - // Check cast - RexNode left = call.getOperands().get(0); - if (left.getKind() == SqlKind.CAST) { - left = ((RexCall)left).getOperands().get(0); - } - RexNode right = call.getOperands().get(1); - if (right.getKind() == SqlKind.CAST) { - right = ((RexCall)right).getOperands().get(0); - } - if (left instanceof RexInputRef && right instanceof RexLiteral) { - break; - } - if (right instanceof RexInputRef && left instanceof RexLiteral) { - break; - } - // Not supported if it is not ref + literal - throw Util.FoundOne.NULL; - case BETWEEN: - case IN: - // Not supported here yet - throw Util.FoundOne.NULL; - } - return super.visitCall(call); - } - }); - } catch (Util.FoundOne e) { - return false; - } - } - return true; - } - } - - /** - * Rule to push a {@link org.apache.calcite.rel.core.Project} into a {@link DruidQuery}. - */ - private static class DruidProjectRule extends RelOptRule { - private DruidProjectRule() { - super(operand(Project.class, - operand(DruidQuery.class, none()))); - } - - public void onMatch(RelOptRuleCall call) { - final Project project = call.rel(0); - final DruidQuery query = call.rel(1); - if (!DruidQuery.isValidSignature(query.signature() + 'p')) { - return; - } - - if (canProjectAll(project.getProjects())) { - // All expressions can be pushed to Druid in their entirety. - final RelNode newProject = project.copy(project.getTraitSet(), - ImmutableList.of(Util.last(query.rels))); - RelNode newNode = DruidQuery.extendQuery(query, newProject); - call.transformTo(newNode); - return; - } - final Pair<List<RexNode>, List<RexNode>> pair = splitProjects( - project.getCluster().getRexBuilder(), query, project.getProjects()); - if (pair == null) { - // We can't push anything useful to Druid. - return; - } - final List<RexNode> above = pair.left; - final List<RexNode> below = pair.right; - final RelDataTypeFactory.FieldInfoBuilder builder = project.getCluster().getTypeFactory() - .builder(); - final RelNode input = Util.last(query.rels); - for (RexNode e : below) { - final String name; - if (e instanceof RexInputRef) { - name = input.getRowType().getFieldNames().get(((RexInputRef) e).getIndex()); - } else { - name = null; - } - builder.add(name, e.getType()); - } - final RelNode newProject = project.copy(project.getTraitSet(), input, below, builder.build()); - final DruidQuery newQuery = DruidQuery.extendQuery(query, newProject); - final RelNode newProject2 = project.copy(project.getTraitSet(), newQuery, above, - project.getRowType()); - call.transformTo(newProject2); - } - - private static boolean canProjectAll(List<RexNode> nodes) { - for (RexNode e : nodes) { - if (!(e instanceof RexInputRef)) { - return false; - } - } - return true; - } - - private static Pair<List<RexNode>, List<RexNode>> splitProjects(final RexBuilder rexBuilder, - final RelNode input, List<RexNode> nodes) { - final RelOptUtil.InputReferencedVisitor visitor = new RelOptUtil.InputReferencedVisitor(); - for (RexNode node : nodes) { - node.accept(visitor); - } - if (visitor.inputPosReferenced.size() == input.getRowType().getFieldCount()) { - // All inputs are referenced - return null; - } - final List<RexNode> belowNodes = new ArrayList<>(); - final List<RelDataType> belowTypes = new ArrayList<>(); - final List<Integer> positions = Lists.newArrayList(visitor.inputPosReferenced); - for (int i : positions) { - final RexNode node = rexBuilder.makeInputRef(input, i); - belowNodes.add(node); - belowTypes.add(node.getType()); - } - final List<RexNode> aboveNodes = new ArrayList<>(); - for (RexNode node : nodes) { - aboveNodes.add(node.accept(new RexShuttle() { - @Override - public RexNode visitInputRef(RexInputRef ref) { - final int index = positions.indexOf(ref.getIndex()); - return rexBuilder.makeInputRef(belowTypes.get(index), index); - } - })); - } - return Pair.of(aboveNodes, belowNodes); - } - } - - /** - * Rule to push an {@link org.apache.calcite.rel.core.Aggregate} into a {@link DruidQuery}. - */ - private static class DruidAggregateRule extends RelOptRule { - private DruidAggregateRule() { - super(operand(Aggregate.class, - operand(DruidQuery.class, none()))); - } - - public void onMatch(RelOptRuleCall call) { - final Aggregate aggregate = call.rel(0); - final DruidQuery query = call.rel(1); - if (!DruidQuery.isValidSignature(query.signature() + 'a')) { - return; - } - if (aggregate.indicator - || aggregate.getGroupSets().size() != 1 - || Iterables.any(aggregate.getAggCallList(), BAD_AGG) - || !validAggregate(aggregate, query)) { - return; - } - final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(), - ImmutableList.of(Util.last(query.rels))); - call.transformTo(DruidQuery.extendQuery(query, newAggregate)); - } - - /* Check whether agg functions reference timestamp */ - private static boolean validAggregate(Aggregate aggregate, DruidQuery query) { - ImmutableBitSet.Builder builder = ImmutableBitSet.builder(); - for (AggregateCall aggCall : aggregate.getAggCallList()) { - builder.addAll(aggCall.getArgList()); - } - return !checkTimestampRefOnQuery(builder.build(), query.getTopNode()); - } - } - - /** - * Rule to push an {@link org.apache.calcite.rel.core.Aggregate} and - * {@link org.apache.calcite.rel.core.Project} into a {@link DruidQuery}. - */ - private static class DruidProjectAggregateRule extends RelOptRule { - private DruidProjectAggregateRule() { - super(operand(Aggregate.class, - operand(Project.class, - operand(DruidQuery.class, none())))); - } - - public void onMatch(RelOptRuleCall call) { - final Aggregate aggregate = call.rel(0); - final Project project = call.rel(1); - final DruidQuery query = call.rel(2); - if (!DruidQuery.isValidSignature(query.signature() + 'p' + 'a')) { - return; - } - int timestampIdx; - if ((timestampIdx = validProject(project, query)) == -1) { - return; - } - if (aggregate.indicator - || aggregate.getGroupSets().size() != 1 - || Iterables.any(aggregate.getAggCallList(), BAD_AGG) - || !validAggregate(aggregate, timestampIdx)) { - return; - } - - final RelNode newProject = project.copy(project.getTraitSet(), - ImmutableList.of(Util.last(query.rels))); - final DruidQuery projectDruidQuery = DruidQuery.extendQuery(query, newProject); - final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(), - ImmutableList.of(Util.last(projectDruidQuery.rels))); - call.transformTo(DruidQuery.extendQuery(projectDruidQuery, newAggregate)); - } - - /* To be a valid Project, we allow it to contain references, and a single call - * to an EXTRACT function on the timestamp column. Returns the reference to - * the timestamp, if any. */ - private static int validProject(Project project, DruidQuery query) { - List<RexNode> nodes = project.getProjects(); - int idxTimestamp = -1; - for (int i = 0; i < nodes.size(); i++) { - final RexNode e = nodes.get(i); - if (e instanceof RexCall) { - // It is a call, check that it is EXTRACT and follow-up conditions - final RexCall call = (RexCall) e; - if (!HiveDateGranularity.ALL_FUNCTIONS.contains(call.getOperator())) { - return -1; - } - if (idxTimestamp != -1) { - // Already one usage of timestamp column - return -1; - } - if (!(call.getOperands().get(0) instanceof RexInputRef)) { - return -1; - } - final RexInputRef ref = (RexInputRef) call.getOperands().get(0); - if (!(checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()), query.getTopNode()))) { - return -1; - } - idxTimestamp = i; - continue; - } - if (!(e instanceof RexInputRef)) { - // It needs to be a reference - return -1; - } - final RexInputRef ref = (RexInputRef) e; - if (checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()), query.getTopNode())) { - if (idxTimestamp != -1) { - // Already one usage of timestamp column - return -1; - } - idxTimestamp = i; - } - } - return idxTimestamp; - } - - private static boolean validAggregate(Aggregate aggregate, int idx) { - if (!aggregate.getGroupSet().get(idx)) { - return false; - } - for (AggregateCall aggCall : aggregate.getAggCallList()) { - if (aggCall.getArgList().contains(idx)) { - return false; - } - } - return true; - } - } - - /** - * Rule to push an {@link org.apache.calcite.rel.core.Sort} through a - * {@link org.apache.calcite.rel.core.Project}. Useful to transform - * to complex Druid queries. - */ - private static class DruidProjectSortRule extends HiveSortProjectTransposeRule { - private DruidProjectSortRule() { - super(operand(Sort.class, - operand(Project.class, - operand(DruidQuery.class, none())))); - } - - @Override - public boolean matches(RelOptRuleCall call) { - return true; - } - - } - - /** - * Rule to push back {@link org.apache.calcite.rel.core.Project} through a - * {@link org.apache.calcite.rel.core.Sort}. Useful if after pushing Sort, - * we could not push it inside DruidQuery. - */ - private static class DruidSortProjectRule extends HiveProjectSortTransposeRule { - private DruidSortProjectRule() { - super(operand(Project.class, - operand(Sort.class, - operand(DruidQuery.class, none())))); - } - } - - /** - * Rule to push an {@link org.apache.calcite.rel.core.Aggregate} into a {@link DruidQuery}. - */ - private static class DruidSortRule extends RelOptRule { - private DruidSortRule() { - super(operand(Sort.class, - operand(DruidQuery.class, none()))); - } - - public void onMatch(RelOptRuleCall call) { - final Sort sort = call.rel(0); - final DruidQuery query = call.rel(1); - if (!DruidQuery.isValidSignature(query.signature() + 'l')) { - return; - } - // Either it is: - // - a sort without limit on the time column on top of - // Agg operator (transformable to timeseries query), or - // - it is a sort w/o limit on columns that do not include - // the time column on top of Agg operator, or - // - a simple limit on top of other operator than Agg - if (!validSortLimit(sort, query)) { - return; - } - final RelNode newSort = sort.copy(sort.getTraitSet(), - ImmutableList.of(Util.last(query.rels))); - call.transformTo(DruidQuery.extendQuery(query, newSort)); - } - - /* Check sort valid */ - private static boolean validSortLimit(Sort sort, DruidQuery query) { - if (sort.offset != null && RexLiteral.intValue(sort.offset) != 0) { - // offset not supported by Druid - return false; - } - if (query.getTopNode() instanceof Aggregate) { - final Aggregate topAgg = (Aggregate) query.getTopNode(); - final ImmutableBitSet.Builder positionsReferenced = ImmutableBitSet.builder(); - int metricsRefs = 0; - for (RelFieldCollation col : sort.collation.getFieldCollations()) { - int idx = col.getFieldIndex(); - if (idx >= topAgg.getGroupCount()) { - metricsRefs++; - continue; - } - positionsReferenced.set(topAgg.getGroupSet().nth(idx)); - } - boolean refsTimestamp = - checkTimestampRefOnQuery(positionsReferenced.build(), topAgg.getInput()); - if (refsTimestamp && metricsRefs != 0) { - return false; - } - return true; - } - // If it is going to be a Druid select operator, we push the limit iff - // 1) it does not contain a sort specification (required by Druid) and - // 2) limit is smaller than select threshold, as otherwise it might be - // better to obtain some parallelization and let global limit - // optimizer kick in - HiveDruidConf conf = sort.getCluster().getPlanner() - .getContext().unwrap(HiveDruidConf.class); - return HiveCalciteUtil.pureLimitRelNode(sort) && - RexLiteral.intValue(sort.fetch) <= conf.getSelectThreshold(); - } - } - - /* Check if any of the references leads to the timestamp column */ - private static boolean checkTimestampRefOnQuery(ImmutableBitSet set, RelNode top) { - if (top instanceof Project) { - ImmutableBitSet.Builder newSet = ImmutableBitSet.builder(); - final Project project = (Project) top; - for (int index : set) { - RexNode node = project.getProjects().get(index); - if (node instanceof RexInputRef) { - newSet.set(((RexInputRef)node).getIndex()); - } else if (node instanceof RexCall) { - RexCall call = (RexCall) node; - assert HiveDateGranularity.ALL_FUNCTIONS.contains(call.getOperator()); - newSet.set(((RexInputRef)call.getOperands().get(0)).getIndex()); - } - } - top = project.getInput(); - set = newSet.build(); - } - - // Check if any references the timestamp column - for (int index : set) { - if (DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(top.getRowType().getFieldNames().get(index))) { - return true; - } - } - - return false; - } - -} - -// End DruidRules.java \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidSchema.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidSchema.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidSchema.java deleted file mode 100644 index 3b3f68a..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidSchema.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.optimizer.calcite.druid; - -import java.util.Map; - -import org.apache.calcite.schema.Table; -import org.apache.calcite.schema.impl.AbstractSchema; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; - -/** - * Schema mapped onto a Druid instance. - * - * TODO: to be removed when Calcite is upgraded to 1.9 - */ -public class DruidSchema extends AbstractSchema { - final String url; - - /** - * Creates a Druid schema. - * - * @param url URL of query REST service - */ - public DruidSchema(String url) { - this.url = Preconditions.checkNotNull(url); - } - - @Override protected Map<String, Table> getTableMap() { - final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder(); - return builder.build(); - } -} - -// End DruidSchema.java \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidTable.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidTable.java deleted file mode 100644 index 7288291..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidTable.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.optimizer.calcite.druid; - -import java.util.List; -import java.util.Set; - -import org.apache.calcite.interpreter.BindableConvention; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.TableScan; -import org.apache.calcite.rel.logical.LogicalTableScan; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.schema.TranslatableTable; -import org.apache.calcite.schema.impl.AbstractTable; -import org.joda.time.DateTime; -import org.joda.time.Interval; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; - -/** - * Table mapped onto a Druid table. - * - * TODO: to be removed when Calcite is upgraded to 1.9 - */ -public class DruidTable extends AbstractTable implements TranslatableTable { - - public static final String DEFAULT_TIMESTAMP_COLUMN = "__time"; - public static final Interval DEFAULT_INTERVAL = new Interval( - new DateTime("1900-01-01"), - new DateTime("3000-01-01") - ); - - final DruidSchema schema; - final String dataSource; - final RelDataType rowType; - final RelProtoDataType protoRowType; - final ImmutableSet<String> metricFieldNames; - final ImmutableList<Interval> intervals; - final String timestampFieldName; - - /** - * Creates a Druid table. - * - * @param schema Druid schema that contains this table - * @param dataSource Druid data source name - * @param protoRowType Field names and types - * @param metricFieldNames Names of fields that are metrics - * @param interval Default interval if query does not constrain the time - * @param timestampFieldName Name of the column that contains the time - */ - public DruidTable(DruidSchema schema, String dataSource, - RelProtoDataType protoRowType, Set<String> metricFieldNames, - List<Interval> intervals, String timestampFieldName) { - this.schema = Preconditions.checkNotNull(schema); - this.dataSource = Preconditions.checkNotNull(dataSource); - this.rowType = null; - this.protoRowType = protoRowType; - this.metricFieldNames = ImmutableSet.copyOf(metricFieldNames); - this.intervals = ImmutableList.copyOf(intervals); - this.timestampFieldName = Preconditions.checkNotNull(timestampFieldName); - } - - public DruidTable(DruidSchema schema, String dataSource, - RelDataType rowType, Set<String> metricFieldNames, - List<Interval> intervals, String timestampFieldName) { - this.schema = Preconditions.checkNotNull(schema); - this.dataSource = Preconditions.checkNotNull(dataSource); - this.rowType = Preconditions.checkNotNull(rowType); - this.protoRowType = null; - this.metricFieldNames = ImmutableSet.copyOf(metricFieldNames); - this.intervals = ImmutableList.copyOf(intervals); - this.timestampFieldName = Preconditions.checkNotNull(timestampFieldName); - } - - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - final RelDataType thisRowType; - if (rowType != null) { - thisRowType = rowType; - } else { - // Generate - thisRowType = protoRowType.apply(typeFactory); - } - final List<String> fieldNames = thisRowType.getFieldNames(); - Preconditions.checkArgument(fieldNames.contains(timestampFieldName)); - Preconditions.checkArgument(fieldNames.containsAll(metricFieldNames)); - return thisRowType; - } - - public RelNode toRel(RelOptTable.ToRelContext context, - RelOptTable relOptTable) { - final RelOptCluster cluster = context.getCluster(); - final TableScan scan = LogicalTableScan.create(cluster, relOptTable); - return DruidQuery.create(cluster, - cluster.traitSetOf(BindableConvention.INSTANCE), relOptTable, this, - ImmutableList.<RelNode>of(scan)); - } - -} - -// End DruidTable.java \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/HiveDruidConf.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/HiveDruidConf.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/HiveDruidConf.java deleted file mode 100644 index 0686dff..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/HiveDruidConf.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.optimizer.calcite.druid; - -public class HiveDruidConf { - - private int selectThreshold; - - - public HiveDruidConf(int selectThreshold) { - this.selectThreshold = selectThreshold; - } - - public int getSelectThreshold() { - return selectThreshold; - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java index dc6b152..6df6026 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java @@ -35,7 +35,6 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.ImmutableBitSet; -import org.apache.calcite.util.IntList; import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; import com.google.common.collect.Sets; @@ -90,7 +89,7 @@ public class HiveAggregate extends Aggregate implements HiveRelNode { final RelDataType inputRowType, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, final List<AggregateCall> aggCalls) { - final IntList groupList = groupSet.toList(); + final List<Integer> groupList = groupSet.asList(); assert groupList.size() == groupSet.cardinality(); final RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder(); final List<RelDataTypeField> fieldList = inputRowType.getFieldList(); http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveDateGranularity.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveDateGranularity.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveDateGranularity.java deleted file mode 100644 index b3f8d9b..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveDateGranularity.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators; - -import java.util.Set; - -import org.apache.calcite.sql.SqlFunction; -import org.apache.calcite.sql.SqlFunctionCategory; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.type.OperandTypes; -import org.apache.calcite.sql.type.ReturnTypes; - -import com.google.common.collect.Sets; - -public class HiveDateGranularity extends SqlFunction { - - public static final SqlFunction YEAR = new HiveDateGranularity("YEAR"); - public static final SqlFunction QUARTER = new HiveDateGranularity("QUARTER"); - public static final SqlFunction MONTH = new HiveDateGranularity("MONTH"); - public static final SqlFunction WEEK = new HiveDateGranularity("WEEK"); - public static final SqlFunction DAY = new HiveDateGranularity("DAY"); - public static final SqlFunction HOUR = new HiveDateGranularity("HOUR"); - public static final SqlFunction MINUTE = new HiveDateGranularity("MINUTE"); - public static final SqlFunction SECOND = new HiveDateGranularity("SECOND"); - - public static final Set<SqlFunction> ALL_FUNCTIONS = - Sets.newHashSet(YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND); - - private HiveDateGranularity(String name) { - super( - name, - SqlKind.OTHER_FUNCTION, - ReturnTypes.TIME_NULLABLE, - null, - OperandTypes.ANY, - SqlFunctionCategory.TIMEDATE); - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveExtractDate.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveExtractDate.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveExtractDate.java new file mode 100644 index 0000000..4edc4df --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveExtractDate.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators; + +import java.util.Set; + +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; + +import com.google.common.collect.Sets; + +public class HiveExtractDate extends SqlFunction { + + public static final SqlFunction YEAR = new HiveExtractDate("YEAR"); + public static final SqlFunction QUARTER = new HiveExtractDate("QUARTER"); + public static final SqlFunction MONTH = new HiveExtractDate("MONTH"); + public static final SqlFunction WEEK = new HiveExtractDate("WEEK"); + public static final SqlFunction DAY = new HiveExtractDate("DAY"); + public static final SqlFunction HOUR = new HiveExtractDate("HOUR"); + public static final SqlFunction MINUTE = new HiveExtractDate("MINUTE"); + public static final SqlFunction SECOND = new HiveExtractDate("SECOND"); + + public static final Set<SqlFunction> ALL_FUNCTIONS = + Sets.newHashSet(YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND); + + private HiveExtractDate(String name) { + super(name, SqlKind.EXTRACT, ReturnTypes.INTEGER_NULLABLE, null, + OperandTypes.INTERVALINTERVAL_INTERVALDATETIME, + SqlFunctionCategory.SYSTEM); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveFloorDate.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveFloorDate.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveFloorDate.java new file mode 100644 index 0000000..3d104ef --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveFloorDate.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators; + +import java.util.Set; + +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.fun.SqlMonotonicUnaryFunction; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.validate.SqlMonotonicity; + +import com.google.common.collect.Sets; + +public class HiveFloorDate extends SqlMonotonicUnaryFunction { + + public static final SqlFunction YEAR = new HiveFloorDate("FLOOR_YEAR"); + public static final SqlFunction QUARTER = new HiveFloorDate("FLOOR_QUARTER"); + public static final SqlFunction MONTH = new HiveFloorDate("FLOOR_MONTH"); + public static final SqlFunction WEEK = new HiveFloorDate("FLOOR_WEEK"); + public static final SqlFunction DAY = new HiveFloorDate("FLOOR_DAY"); + public static final SqlFunction HOUR = new HiveFloorDate("FLOOR_HOUR"); + public static final SqlFunction MINUTE = new HiveFloorDate("FLOOR_MINUTE"); + public static final SqlFunction SECOND = new HiveFloorDate("FLOOR_SECOND"); + + public static final Set<SqlFunction> ALL_FUNCTIONS = + Sets.newHashSet(YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND); + + private HiveFloorDate(String name) { + super(name, SqlKind.FLOOR, ReturnTypes.ARG0_OR_EXACT_NO_SCALE, null, + OperandTypes.sequence( + "'" + SqlKind.FLOOR + "(<DATE> TO <TIME_UNIT>)'\n" + + "'" + SqlKind.FLOOR + "(<TIME> TO <TIME_UNIT>)'\n" + + "'" + SqlKind.FLOOR + "(<TIMESTAMP> TO <TIME_UNIT>)'", + OperandTypes.DATETIME, + OperandTypes.ANY), + SqlFunctionCategory.NUMERIC); + } + + @Override + public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) { + // Monotonic iff its first argument is, but not strict. + return call.getOperandMonotonicity(0).unstrict(); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateJoinTransposeRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateJoinTransposeRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateJoinTransposeRule.java index e9a4d88..87e755c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateJoinTransposeRule.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateJoinTransposeRule.java @@ -133,9 +133,10 @@ public class HiveAggregateJoinTransposeRule extends AggregateJoinTransposeRule { // Split join condition final List<Integer> leftKeys = Lists.newArrayList(); final List<Integer> rightKeys = Lists.newArrayList(); + final List<Boolean> filterNulls = Lists.newArrayList(); RexNode nonEquiConj = RelOptUtil.splitJoinCondition(join.getLeft(), join.getRight(), - join.getCondition(), leftKeys, rightKeys); + join.getCondition(), leftKeys, rightKeys, filterNulls); // If it contains non-equi join conditions, we bail out if (!nonEquiConj.isAlwaysTrue()) { return; @@ -271,7 +272,8 @@ public class HiveAggregateJoinTransposeRule extends AggregateJoinTransposeRule { RelOptUtil.areRowTypesEqual(r.getRowType(), aggregate.getRowType(), false)) { // no need to aggregate } else { - r = RelOptUtil.createProject(r, projects, null, true, projectFactory); + r = RelOptUtil.createProject(r, projects, null, true, + relBuilderFactory.create(aggregate.getCluster(), null)); if (allColumnsInAggregate) { // let's see if we can convert List<RexNode> projects2 = new ArrayList<>(); @@ -290,7 +292,8 @@ public class HiveAggregateJoinTransposeRule extends AggregateJoinTransposeRule { if (projects2.size() == aggregate.getGroupSet().cardinality() + newAggCalls.size()) { // We successfully converted agg calls into projects. - r = RelOptUtil.createProject(r, projects2, null, true, projectFactory); + r = RelOptUtil.createProject(r, projects2, null, true, + relBuilderFactory.create(aggregate.getCluster(), null)); break b; } } http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateProjectMergeRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateProjectMergeRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateProjectMergeRule.java index 8af8a0d..c243266 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateProjectMergeRule.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateProjectMergeRule.java @@ -141,7 +141,8 @@ public class HiveAggregateProjectMergeRule extends RelOptRule { i < newAggregate.getRowType().getFieldCount(); i++) { posList.add(i); } - rel = HiveRelOptUtil.createProject(HiveRelFactories.HIVE_PROJECT_FACTORY, + rel = HiveRelOptUtil.createProject( + HiveRelFactories.HIVE_BUILDER.create(aggregate.getCluster(), null), rel, posList); }