[CALCITE-2170] Use Druid Expressions capabilities to improve the amount of work that can be pushed to Druid
Close apache/calcite#624 Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/98f3704e Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/98f3704e Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/98f3704e Branch: refs/heads/master Commit: 98f3704ea4536d6ead6465376bd02139b889f6e9 Parents: 707f4de Author: Christian Tzolov <[email protected]> Authored: Thu Nov 3 06:30:04 2016 +0100 Committer: Jesus Camacho Rodriguez <[email protected]> Committed: Fri Feb 16 19:16:58 2018 -0800 ---------------------------------------------------------------------- .../adapter/druid/BinaryOperatorConversion.java | 64 + .../adapter/druid/CeilOperatorConversion.java | 77 + .../adapter/druid/DefaultDimensionSpec.java | 29 +- .../calcite/adapter/druid/DimensionSpec.java | 8 +- .../adapter/druid/DirectOperatorConversion.java | 55 + .../adapter/druid/DruidConnectionImpl.java | 49 +- .../adapter/druid/DruidDateTimeUtils.java | 68 + .../calcite/adapter/druid/DruidExpressions.java | 283 +++ .../apache/calcite/adapter/druid/DruidJson.java | 29 + .../calcite/adapter/druid/DruidJsonFilter.java | 642 +++++ .../calcite/adapter/druid/DruidQuery.java | 2114 ++++++++-------- .../adapter/druid/DruidResultEnumerator.java | 25 - .../calcite/adapter/druid/DruidRules.java | 626 +---- .../adapter/druid/DruidSqlCastConverter.java | 152 ++ .../druid/DruidSqlOperatorConverter.java | 49 + .../apache/calcite/adapter/druid/DruidType.java | 16 +- .../druid/ExtractOperatorConversion.java | 80 + .../adapter/druid/ExtractionDimensionSpec.java | 50 +- .../adapter/druid/ExtractionFunction.java | 2 +- .../adapter/druid/FloorOperatorConversion.java | 74 + .../calcite/adapter/druid/Granularities.java | 4 +- .../calcite/adapter/druid/Granularity.java | 2 +- .../adapter/druid/NaryOperatorConverter.java | 60 + .../druid/SubstringOperatorConversion.java | 63 + .../druid/TimeExtractionDimensionSpec.java | 75 - .../adapter/druid/TimeExtractionFunction.java | 63 +- .../druid/UnaryPrefixOperatorConversion.java | 63 + .../druid/UnarySuffixOperatorConversion.java | 62 + .../calcite/adapter/druid/VirtualColumn.java | 100 + .../adapter/druid/DruidQueryFilterTest.java | 47 +- .../org/apache/calcite/test/DruidAdapterIT.java | 2367 ++++++++++++------ 31 files changed, 4880 insertions(+), 2518 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/BinaryOperatorConversion.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/BinaryOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/BinaryOperatorConversion.java new file mode 100644 index 0000000..d10c147 --- /dev/null +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/BinaryOperatorConversion.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.druid; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; + +import java.util.List; + +/** + * Binary operator conversion utility class used to convert expression like exp1 Operator exp2 + */ +public class BinaryOperatorConversion implements DruidSqlOperatorConverter { + private final SqlOperator operator; + private final String druidOperator; + + public BinaryOperatorConversion(final SqlOperator operator, final String druidOperator) { + this.operator = operator; + this.druidOperator = druidOperator; + } + + @Override public SqlOperator calciteOperator() { + return operator; + } + + @Override public String toDruidExpression(RexNode rexNode, RelDataType rowType, + DruidQuery druidQuery) { + + final RexCall call = (RexCall) rexNode; + + final List<String> druidExpressions = DruidExpressions.toDruidExpressions( + druidQuery, rowType, + call.getOperands()); + if (druidExpressions == null) { + return null; + } + if (druidExpressions.size() != 2) { + throw new IllegalStateException( + DruidQuery.format("Got binary operator[%s] with %s args?", operator.getName(), + druidExpressions.size())); + } + + return DruidQuery + .format("(%s %s %s)", druidExpressions.get(0), druidOperator, druidExpressions.get(1)); + } +} + +// End BinaryOperatorConversion.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java new file mode 100644 index 0000000..7f15307 --- /dev/null +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.druid; + +import org.apache.calcite.avatica.util.TimeUnitRange; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; + +import java.util.TimeZone; + +import javax.annotation.Nullable; + +/** + * DruidSqlOperatorConverter implementation that handles Ceil operations conversions + */ +public class CeilOperatorConversion implements DruidSqlOperatorConverter { + @Override public SqlOperator calciteOperator() { + return SqlStdOperatorTable.CEIL; + } + + @Nullable + @Override public String toDruidExpression(RexNode rexNode, RelDataType rowType, + DruidQuery query) { + final RexCall call = (RexCall) rexNode; + final RexNode arg = call.getOperands().get(0); + final String druidExpression = DruidExpressions.toDruidExpression( + arg, + rowType, + query); + if (druidExpression == null) { + return null; + } else if (call.getOperands().size() == 1) { + // case CEIL(expr) + return DruidQuery.format("ceil(%s)", druidExpression); + } else if (call.getOperands().size() == 2) { + // CEIL(expr TO timeUnit) + final RexLiteral flag = (RexLiteral) call.getOperands().get(1); + final TimeUnitRange timeUnit = (TimeUnitRange) flag.getValue(); + final Granularity.Type type = DruidDateTimeUtils.toDruidGranularity(timeUnit); + if (type == null) { + // Unknown Granularity bail out + return null; + } + String isoPeriodFormat = DruidDateTimeUtils.toISOPeriodFormat(type); + if (isoPeriodFormat == null) { + return null; + } + return DruidExpressions.applyTimestampCeil( + druidExpression, + isoPeriodFormat, + "", + TimeZone.getTimeZone(query.getConnectionConfig().timeZone())); + } else { + return null; + } + } +} + +// End CeilOperatorConversion.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java index 015edff..28f99da 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java @@ -17,6 +17,7 @@ package org.apache.calcite.adapter.druid; import com.fasterxml.jackson.core.JsonGenerator; +import com.google.common.base.Preconditions; import java.io.IOException; @@ -29,17 +30,43 @@ import java.io.IOException; public class DefaultDimensionSpec implements DimensionSpec { private final String dimension; + private final String outputName; + private final DruidType outputType; + + public DefaultDimensionSpec(String dimension, String outputName, DruidType outputType) { + this.dimension = Preconditions.checkNotNull(dimension); + this.outputName = Preconditions.checkNotNull(outputName); + this.outputType = outputType == null ? DruidType.STRING : outputType; + } public DefaultDimensionSpec(String dimension) { - this.dimension = dimension; + this(dimension, dimension, null); } @Override public void write(JsonGenerator generator) throws IOException { generator.writeStartObject(); generator.writeStringField("type", "default"); generator.writeStringField("dimension", dimension); + generator.writeStringField("outputName", outputName); + generator.writeStringField("outputType", outputType.name()); generator.writeEndObject(); } + + @Override public String getOutputName() { + return outputName; + } + + @Override public DruidType getOutputType() { + return outputType; + } + + @Override public ExtractionFunction getExtractionFn() { + return null; + } + + @Override public String getDimension() { + return dimension; + } } // End DefaultDimensionSpec.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java index 45625c3..14c02e6 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java @@ -16,12 +16,18 @@ */ package org.apache.calcite.adapter.druid; +import javax.annotation.Nullable; + /** * Interface for Druid DimensionSpec. * * <p>DimensionSpecs define how dimension values get transformed prior to aggregation. */ -public interface DimensionSpec extends DruidQuery.Json { +public interface DimensionSpec extends DruidJson { + String getOutputName(); + DruidType getOutputType(); + @Nullable ExtractionFunction getExtractionFn(); + String getDimension(); } // End DimensionSpec.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DirectOperatorConversion.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DirectOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DirectOperatorConversion.java new file mode 100644 index 0000000..c937e83 --- /dev/null +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DirectOperatorConversion.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.druid; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; + +import java.util.List; + +/** + * Direct operator conversion for expression like Function(exp_1,...exp_n) + */ +public class DirectOperatorConversion implements DruidSqlOperatorConverter { + private final SqlOperator operator; + private final String druidFunctionName; + + public DirectOperatorConversion(final SqlOperator operator, final String druidFunctionName) { + this.operator = operator; + this.druidFunctionName = druidFunctionName; + } + + @Override public SqlOperator calciteOperator() { + return operator; + } + + @Override public String toDruidExpression(RexNode rexNode, RelDataType rowType, + DruidQuery druidQuery) { + final RexCall call = (RexCall) rexNode; + final List<String> druidExpressions = DruidExpressions.toDruidExpressions( + druidQuery, rowType, + call.getOperands()); + if (druidExpressions == null) { + return null; + } + return DruidExpressions.functionCall(druidFunctionName, druidExpressions); + } +} + +// End DirectOperatorConversion.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java index 4f65dff..40883bf 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java @@ -38,6 +38,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.CollectionType; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -72,12 +73,15 @@ class DruidConnectionImpl implements DruidConnection { public static final String DEFAULT_RESPONSE_TIMESTAMP_COLUMN = "timestamp"; private static final SimpleDateFormat UTC_TIMESTAMP_FORMAT; + private static final SimpleDateFormat TIMESTAMP_FORMAT; static { final TimeZone utc = DateTimeUtils.UTC_ZONE; UTC_TIMESTAMP_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.ROOT); UTC_TIMESTAMP_FORMAT.setTimeZone(utc); + TIMESTAMP_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT); + TIMESTAMP_FORMAT.setTimeZone(utc); } DruidConnectionImpl(String url, String coordinatorUrl) { @@ -132,6 +136,10 @@ class DruidConnectionImpl implements DruidConnection { int posTimestampField = -1; for (int i = 0; i < fieldTypes.size(); i++) { + /*@TODO This need to be revisited. The logic seems implying that only + one column of type timestamp is present, this is not necessarily true, + see https://issues.apache.org/jira/browse/CALCITE-2175 + */ if (fieldTypes.get(i) == ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP) { posTimestampField = i; break; @@ -324,30 +332,41 @@ class DruidConnectionImpl implements DruidConnection { } if (isTimestampColumn || ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP == type) { - try { - final long timeInMillis; - - if (token == JsonToken.VALUE_NUMBER_INT) { - timeInMillis = parser.getLongValue(); - } else { + final int fieldPos = posTimestampField != -1 ? posTimestampField : i; + if (token == JsonToken.VALUE_NUMBER_INT) { + rowBuilder.set(posTimestampField, parser.getLongValue()); + return; + } else { + // We don't have any way to figure out the format of time upfront since we only have + // org.apache.calcite.avatica.ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP as type to represent + // both timestamp and timestamp with local timezone. + // Logic where type is inferred can be found at DruidQuery.DruidQueryNode.getPrimitive() + // Thus need to guess via try and catch + synchronized (UTC_TIMESTAMP_FORMAT) { // synchronized block to avoid race condition - synchronized (UTC_TIMESTAMP_FORMAT) { - timeInMillis = UTC_TIMESTAMP_FORMAT.parse(parser.getText()).getTime(); + try { + //First try to pars as Timestamp with timezone. + rowBuilder + .set(fieldPos, UTC_TIMESTAMP_FORMAT.parse(parser.getText()).getTime()); + } catch (ParseException e) { + // swallow the exception and try timestamp format + try { + rowBuilder + .set(fieldPos, TIMESTAMP_FORMAT.parse(parser.getText()).getTime()); + } catch (ParseException e2) { + // unknown format should not happen + Throwables.propagate(e2); + } } } - if (posTimestampField != -1) { - rowBuilder.set(posTimestampField, timeInMillis); - } - } catch (ParseException e) { - // ignore bad value + return; } - return; } switch (token) { case VALUE_NUMBER_INT: if (type == null) { - type = ColumnMetaData.Rep.INTEGER; + type = ColumnMetaData.Rep.LONG; } // fall through case VALUE_NUMBER_FLOAT: http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java index 2a9851a..91f5fa4 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java @@ -39,6 +39,7 @@ import com.google.common.collect.Range; import com.google.common.collect.TreeRangeSet; import org.joda.time.Interval; +import org.joda.time.Period; import org.joda.time.chrono.ISOChronology; import org.slf4j.Logger; @@ -46,6 +47,8 @@ import java.util.ArrayList; import java.util.List; import java.util.TimeZone; +import javax.annotation.Nullable; + /** * Utilities for generating intervals from RexNode. */ @@ -62,6 +65,7 @@ public class DruidDateTimeUtils { * expression. Assumes that all the predicates in the input * reference a single column: the timestamp column. */ + @Nullable public static List<Interval> createInterval(RexNode e, String timeZone) { final List<Range<TimestampString>> ranges = extractRanges(e, TimeZone.getTimeZone(timeZone), false); @@ -111,6 +115,7 @@ public class DruidDateTimeUtils { return intervals; } + @Nullable protected static List<Range<TimestampString>> extractRanges(RexNode node, TimeZone timeZone, boolean withNot) { switch (node.getKind()) { @@ -171,6 +176,7 @@ public class DruidDateTimeUtils { } } + @Nullable protected static List<Range<TimestampString>> leafToRanges(RexCall call, TimeZone timeZone, boolean withNot) { switch (call.getKind()) { @@ -249,6 +255,7 @@ public class DruidDateTimeUtils { } } + @Nullable protected static TimestampString literalValue(RexNode node, TimeZone timeZone) { switch (node.getKind()) { case LITERAL: @@ -318,6 +325,67 @@ public class DruidDateTimeUtils { return Granularities.createGranularity(timeUnit, timeZone); } + /** + * @param type Druid Granularity to translate as period of time + * + * @return String representing the granularity as ISO8601 Period of Time, null for unknown case. + */ + @Nullable + public static String toISOPeriodFormat(Granularity.Type type) { + switch (type) { + case SECOND: + return Period.seconds(1).toString(); + case MINUTE: + return Period.minutes(1).toString(); + case HOUR: + return Period.hours(1).toString(); + case DAY: + return Period.days(1).toString(); + case WEEK: + return Period.weeks(1).toString(); + case MONTH: + return Period.months(1).toString(); + case QUARTER: + return Period.months(3).toString(); + case YEAR: + return Period.years(1).toString(); + default: + return null; + } + } + + /** + * Translates Calcite TimeUnitRange to Druid {@link Granularity} + * @param timeUnit Calcite Time unit to convert + * + * @return Druid Granularity or null + */ + @Nullable + public static Granularity.Type toDruidGranularity(TimeUnitRange timeUnit) { + if (timeUnit == null) { + return null; + } + switch (timeUnit) { + case YEAR: + return Granularity.Type.YEAR; + case QUARTER: + return Granularity.Type.QUARTER; + case MONTH: + return Granularity.Type.MONTH; + case WEEK: + return Granularity.Type.WEEK; + case DAY: + return Granularity.Type.DAY; + case HOUR: + return Granularity.Type.HOUR; + case MINUTE: + return Granularity.Type.MINUTE; + case SECOND: + return Granularity.Type.SECOND; + default: + return null; + } + } } // End DruidDateTimeUtils.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java new file mode 100644 index 0000000..78cfb0c --- /dev/null +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.druid; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.BaseEncoding; +import com.google.common.primitives.Chars; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; + +import javax.annotation.Nullable; + +/** + * Expression utility class to transform Calcite expressions to Druid expressions when possible. + */ +public class DruidExpressions { + + /** + * Type mapping between Calcite SQL family types and native Druid expression types + */ + static final Map<SqlTypeName, DruidType> EXPRESSION_TYPES; + /** + * Druid expression safe chars, must be sorted. + */ + private static final char[] SAFE_CHARS = " ,._-;:(){}[]<>!@#$%^&*`~?/".toCharArray(); + + static { + final ImmutableMap.Builder<SqlTypeName, DruidType> builder = ImmutableMap.builder(); + + for (SqlTypeName type : SqlTypeName.FRACTIONAL_TYPES) { + builder.put(type, DruidType.DOUBLE); + } + + for (SqlTypeName type : SqlTypeName.INT_TYPES) { + builder.put(type, DruidType.LONG); + } + + for (SqlTypeName type : SqlTypeName.STRING_TYPES) { + builder.put(type, DruidType.STRING); + } + // Timestamps are treated as longs (millis since the epoch) in Druid expressions. + builder.put(SqlTypeName.TIMESTAMP, DruidType.LONG); + builder.put(SqlTypeName.DATE, DruidType.LONG); + builder.put(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, DruidType.LONG); + builder.put(SqlTypeName.OTHER, DruidType.COMPLEX); + EXPRESSION_TYPES = builder.build(); + // Safe chars must be sorted + Arrays.sort(SAFE_CHARS); + } + private DruidExpressions() { + } + + + /** + * Translates Calcite rexNode to Druid Expression when possible + * @param rexNode rexNode to convert to a Druid Expression + * @param inputRowType input row type of the rexNode to translate + * @param druidRel Druid query + * + * @return Druid Expression or null when can not convert the RexNode + */ + @Nullable + public static String toDruidExpression( + final RexNode rexNode, + final RelDataType inputRowType, + final DruidQuery druidRel) { + SqlKind kind = rexNode.getKind(); + SqlTypeName sqlTypeName = rexNode.getType().getSqlTypeName(); + + if (kind == SqlKind.INPUT_REF) { + final RexInputRef ref = (RexInputRef) rexNode; + final String columnName = inputRowType.getFieldNames().get(ref.getIndex()); + if (columnName == null) { + return null; + } + if (druidRel.getDruidTable().timestampFieldName.equals(columnName)) { + return DruidExpressions.fromColumn(DruidTable.DEFAULT_TIMESTAMP_COLUMN); + } + return DruidExpressions.fromColumn(columnName); + } + + if (rexNode instanceof RexCall) { + final SqlOperator operator = ((RexCall) rexNode).getOperator(); + final DruidSqlOperatorConverter conversion = druidRel.getOperatorConversionMap() + .get(operator); + if (conversion == null) { + //unknown operator can not translate + return null; + } else { + return conversion.toDruidExpression(rexNode, inputRowType, druidRel); + } + } + if (kind == SqlKind.LITERAL) { + // Translate literal. + if (RexLiteral.isNullLiteral(rexNode)) { + //case the filter/project might yield to unknown let Calcite deal with this for now + return null; + } else if (SqlTypeName.NUMERIC_TYPES.contains(sqlTypeName)) { + return DruidExpressions.numberLiteral((Number) RexLiteral + .value(rexNode)); + } else if (SqlTypeFamily.INTERVAL_DAY_TIME == sqlTypeName.getFamily()) { + // Calcite represents DAY-TIME intervals in milliseconds. + final long milliseconds = ((Number) RexLiteral.value(rexNode)).longValue(); + return DruidExpressions.numberLiteral(milliseconds); + } else if (SqlTypeFamily.INTERVAL_YEAR_MONTH == sqlTypeName.getFamily()) { + // Calcite represents YEAR-MONTH intervals in months. + final long months = ((Number) RexLiteral.value(rexNode)).longValue(); + return DruidExpressions.numberLiteral(months); + } else if (SqlTypeName.STRING_TYPES.contains(sqlTypeName)) { + return + DruidExpressions.stringLiteral(RexLiteral.stringValue(rexNode)); + } else if (SqlTypeName.TIMESTAMP == sqlTypeName || SqlTypeName.DATE == sqlTypeName + || SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE == sqlTypeName) { + return DruidExpressions.numberLiteral(DruidDateTimeUtils + .literalValue(rexNode, TimeZone.getTimeZone(druidRel.getConnectionConfig().timeZone())) + .getMillisSinceEpoch()); + } else if (SqlTypeName.BOOLEAN == sqlTypeName) { + return DruidExpressions.numberLiteral(RexLiteral.booleanValue(rexNode) ? 1 : 0); + } + } + // Not Literal/InputRef/RexCall or unknown type? + return null; + } + + public static String fromColumn(String columnName) { + return DruidQuery.format("\"%s\"", columnName); + } + + public static String nullLiteral() { + return "null"; + } + + public static String numberLiteral(final Number n) { + return n == null ? nullLiteral() : n.toString(); + } + + public static String stringLiteral(final String s) { + return s == null ? nullLiteral() : "'" + escape(s) + "'"; + } + + private static String escape(final String s) { + final StringBuilder escaped = new StringBuilder(); + for (int i = 0; i < s.length(); i++) { + final char c = s.charAt(i); + if (Character.isLetterOrDigit(c) || Arrays.binarySearch(SAFE_CHARS, c) >= 0) { + escaped.append(c); + } else { + escaped.append("\\u").append(BaseEncoding.base16().encode(Chars.toByteArray(c))); + } + } + return escaped.toString(); + } + + public static String functionCall(final String functionName, final List<String> args) { + Preconditions.checkNotNull(functionName, "druid functionName"); + Preconditions.checkNotNull(args, "args"); + + final StringBuilder builder = new StringBuilder(functionName); + builder.append("("); + for (int i = 0; i < args.size(); i++) { + final String arg = Preconditions.checkNotNull(args.get(i), "arg #%s", i); + builder.append(arg); + if (i < args.size() - 1) { + builder.append(","); + } + } + builder.append(")"); + return builder.toString(); + } + + public static String nAryOperatorCall(final String druidOperator, final List<String> args) { + Preconditions.checkNotNull(druidOperator, "druid operator missing"); + Preconditions.checkNotNull(args, "args"); + final StringBuilder builder = new StringBuilder(); + builder.append("("); + for (int i = 0; i < args.size(); i++) { + final String arg = Preconditions.checkNotNull(args.get(i), "arg #%s", i); + builder.append(arg); + if (i < args.size() - 1) { + builder.append(druidOperator); + } + } + builder.append(")"); + return builder.toString(); + } + + /** + * Translate a list of Calcite {@code RexNode} to Druid expressions. + * + * @param rexNodes list of Calcite expressions meant to be applied on top of the rows + * + * @return list of Druid expressions in the same order as rexNodes, or null if not possible. + * If a non-null list is returned, all elements will be non-null. + */ + @Nullable + public static List<String> toDruidExpressions( + final DruidQuery druidRel, final RelDataType rowType, + final List<RexNode> rexNodes) { + final List<String> retVal = new ArrayList<>(rexNodes.size()); + for (RexNode rexNode : rexNodes) { + final String druidExpression = toDruidExpression(rexNode, rowType, druidRel); + if (druidExpression == null) { + return null; + } + + retVal.add(druidExpression); + } + return retVal; + } + + public static String applyTimestampFloor( + final String input, + final String granularity, + final String origin, + final TimeZone timeZone) { + Preconditions.checkNotNull(input, "input"); + Preconditions.checkNotNull(granularity, "granularity"); + return DruidExpressions.functionCall( + "timestamp_floor", + ImmutableList.of(input, + DruidExpressions.stringLiteral(granularity), + DruidExpressions.stringLiteral(origin), + DruidExpressions.stringLiteral(timeZone.getID()))); + } + + public static String applyTimestampCeil( + final String input, + final String granularity, + final String origin, + final TimeZone timeZone) { + Preconditions.checkNotNull(input, "input"); + Preconditions.checkNotNull(granularity, "granularity"); + return DruidExpressions.functionCall( + "timestamp_ceil", + ImmutableList.of(input, + DruidExpressions.stringLiteral(granularity), + DruidExpressions.stringLiteral(origin), + DruidExpressions.stringLiteral(timeZone.getID()))); + } + + + public static String applyTimeExtract(String timeExpression, String druidUnit, + TimeZone timeZone) { + return DruidExpressions.functionCall( + "timestamp_extract", + ImmutableList.of( + timeExpression, + DruidExpressions.stringLiteral(druidUnit), + DruidExpressions.stringLiteral(timeZone.getID()))); + } +} + +// End DruidExpressions.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJson.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJson.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJson.java new file mode 100644 index 0000000..77ccf4f --- /dev/null +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJson.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.druid; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; + +/** Object that knows how to write itself to a + * {@link com.fasterxml.jackson.core.JsonGenerator}. */ +public interface DruidJson { + void write(JsonGenerator generator) throws IOException; +} + +// End DruidJson.java http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java new file mode 100644 index 0000000..11ec2be --- /dev/null +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java @@ -0,0 +1,642 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.druid; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.TimestampString; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.List; +import java.util.Locale; +import java.util.TimeZone; + +import javax.annotation.Nullable; + + +/** + * Filter element of a Druid "groupBy" or "topN" query. + */ +abstract class DruidJsonFilter implements DruidJson { + + /** + * @param rexNode rexNode to translate to Druid Json Filter + * @param rowType rowType associated to rexNode + * @param druidQuery druid query + * + * @return Druid Json filter or null if it can not translate + */ + @Nullable + private static DruidJsonFilter toEqualityKindDruidFilter(RexNode rexNode, RelDataType rowType, + DruidQuery druidQuery) { + if (rexNode.getKind() != SqlKind.EQUALS && rexNode.getKind() != SqlKind.NOT_EQUALS) { + throw new AssertionError( + DruidQuery.format("Expecting EQUALS or NOT_EQUALS but got [%s]", rexNode.getKind())); + } + final RexCall rexCall = (RexCall) rexNode; + if (rexCall.getOperands().size() < 2) { + return null; + } + final RexLiteral rexLiteral; + final RexNode refNode; + final RexNode lhs = rexCall.getOperands().get(0); + final RexNode rhs = rexCall.getOperands().get(1); + if (lhs.getKind() == SqlKind.LITERAL && rhs.getKind() != SqlKind.LITERAL) { + rexLiteral = (RexLiteral) lhs; + refNode = rhs; + } else if (rhs.getKind() == SqlKind.LITERAL && lhs.getKind() != SqlKind.LITERAL) { + rexLiteral = (RexLiteral) rhs; + refNode = lhs; + } else { + // must have at least one literal + return null; + } + + if (RexLiteral.isNullLiteral(rexLiteral)) { + // we are not handling is NULL filter here thus we bail out if Literal is null + return null; + } + final String literalValue = toDruidLiteral(rexLiteral, rowType, druidQuery); + if (literalValue == null) { + // can not translate literal better bail out + return null; + } + final boolean isNumeric = refNode.getType().getFamily() == SqlTypeFamily.NUMERIC + || rexLiteral.getType().getFamily() == SqlTypeFamily.NUMERIC; + final Pair<String, ExtractionFunction> druidColumn = DruidQuery.toDruidColumn(refNode, rowType, + druidQuery); + final String columnName = druidColumn.left; + final ExtractionFunction extractionFunction = druidColumn.right; + if (columnName == null) { + // no column name better bail out. + return null; + } + final DruidJsonFilter partialFilter; + if (isNumeric) { + //need bound filter since it one of operands is numeric + partialFilter = new JsonBound(columnName, literalValue, false, literalValue, false, true, + extractionFunction); + } else { + partialFilter = new JsonSelector(columnName, literalValue, extractionFunction); + } + + if (rexNode.getKind() == SqlKind.EQUALS) { + return partialFilter; + } + return toNotDruidFilter(partialFilter); + } + + + /** + * @param rexNode rexNode to translate + * @param rowType row type associated to Filter + * @param druidQuery druid query + * + * @return valid Druid Json Bound Filter or null if it can not translate the rexNode. + */ + @Nullable + private static DruidJsonFilter toBoundDruidFilter(RexNode rexNode, RelDataType rowType, + DruidQuery druidQuery) { + final RexCall rexCall = (RexCall) rexNode; + final RexLiteral rexLiteral; + if (rexCall.getOperands().size() < 2) { + return null; + } + final RexNode refNode; + final RexNode lhs = rexCall.getOperands().get(0); + final RexNode rhs = rexCall.getOperands().get(1); + final boolean lhsIsRef; + if (lhs.getKind() == SqlKind.LITERAL && rhs.getKind() != SqlKind.LITERAL) { + rexLiteral = (RexLiteral) lhs; + refNode = rhs; + lhsIsRef = false; + } else if (rhs.getKind() == SqlKind.LITERAL && lhs.getKind() != SqlKind.LITERAL) { + rexLiteral = (RexLiteral) rhs; + refNode = lhs; + lhsIsRef = true; + } else { + // must have at least one literal + return null; + } + + if (RexLiteral.isNullLiteral(rexLiteral)) { + // we are not handling is NULL filter here thus we bail out if Literal is null + return null; + } + final String literalValue = DruidJsonFilter.toDruidLiteral(rexLiteral, rowType, druidQuery); + if (literalValue == null) { + // can not translate literal better bail out + return null; + } + final boolean isNumeric = refNode.getType().getFamily() == SqlTypeFamily.NUMERIC + || rexLiteral.getType().getFamily() == SqlTypeFamily.NUMERIC; + final Pair<String, ExtractionFunction> druidColumn = DruidQuery.toDruidColumn(refNode, rowType, + druidQuery); + final String columnName = druidColumn.left; + final ExtractionFunction extractionFunction = druidColumn.right; + if (columnName == null) { + // no column name better bail out. + return null; + } + switch (rexCall.getKind()) { + case LESS_THAN_OR_EQUAL: + case LESS_THAN: + if (lhsIsRef) { + return new JsonBound(columnName, null, false, literalValue, + rexCall.getKind() == SqlKind.LESS_THAN, isNumeric, + extractionFunction); + } else { + return new JsonBound(columnName, literalValue, rexCall.getKind() == SqlKind.LESS_THAN, null, + false, isNumeric, + extractionFunction); + } + case GREATER_THAN_OR_EQUAL: + case GREATER_THAN: + if (!lhsIsRef) { + return new JsonBound(columnName, null, false, literalValue, + rexCall.getKind() == SqlKind.GREATER_THAN, isNumeric, + extractionFunction); + } else { + return new JsonBound(columnName, literalValue, rexCall.getKind() == SqlKind.GREATER_THAN, + null, + false, isNumeric, + extractionFunction); + } + default: + return null; + } + + } + + /** + * @param rexNode rexNode to translate to Druid literal equivalante + * @param rowType rowType associated to rexNode + * @param druidQuery druid Query + * + * @return non null string or null if it can not translate to valid Druid equivalent + */ + @Nullable + private static String toDruidLiteral(RexNode rexNode, RelDataType rowType, + DruidQuery druidQuery) { + final SimpleDateFormat dateFormatter = new SimpleDateFormat( + TimeExtractionFunction.ISO_TIME_FORMAT, + Locale.ROOT); + final String timeZone = druidQuery.getConnectionConfig().timeZone(); + if (timeZone != null) { + dateFormatter.setTimeZone(TimeZone.getTimeZone(timeZone)); + } + final String val; + final RexLiteral rhsLiteral = (RexLiteral) rexNode; + if (SqlTypeName.NUMERIC_TYPES.contains(rhsLiteral.getTypeName())) { + val = String.valueOf(RexLiteral.value(rhsLiteral)); + } else if (SqlTypeName.CHAR_TYPES.contains(rhsLiteral.getTypeName())) { + val = String.valueOf(RexLiteral.stringValue(rhsLiteral)); + } else if (SqlTypeName.TIMESTAMP == rhsLiteral.getTypeName() || SqlTypeName.DATE == rhsLiteral + .getTypeName() || SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE == rhsLiteral.getTypeName()) { + TimestampString timestampString = DruidDateTimeUtils + .literalValue(rexNode, TimeZone.getTimeZone(timeZone)); + if (timestampString == null) { + throw new AssertionError( + "Cannot translate Literal" + rexNode + " of type " + + rhsLiteral.getTypeName() + " to TimestampString"); + } + //@TODO this is unnecessary we can send time as Long (eg millis since epoch) to druid + val = dateFormatter.format(timestampString.getMillisSinceEpoch()); + } else { + // Don't know how to filter on this kind of literal. + val = null; + } + return val; + } + + @Nullable + private static DruidJsonFilter toIsNullKindDruidFilter(RexNode rexNode, RelDataType rowType, + DruidQuery druidQuery) { + if (rexNode.getKind() != SqlKind.IS_NULL && rexNode.getKind() != SqlKind.IS_NOT_NULL) { + throw new AssertionError( + DruidQuery.format("Expecting IS_NULL or IS_NOT_NULL but got [%s]", rexNode.getKind())); + } + final RexCall rexCall = (RexCall) rexNode; + final RexNode refNode = rexCall.getOperands().get(0); + Pair<String, ExtractionFunction> druidColumn = DruidQuery + .toDruidColumn(refNode, rowType, druidQuery); + final String columnName = druidColumn.left; + final ExtractionFunction extractionFunction = druidColumn.right; + if (columnName == null) { + return null; + } + if (rexNode.getKind() == SqlKind.IS_NOT_NULL) { + return toNotDruidFilter(new JsonSelector(columnName, null, extractionFunction)); + } + return new JsonSelector(columnName, null, extractionFunction); + } + + @Nullable + private static DruidJsonFilter toInKindDruidFilter(RexNode e, RelDataType rowType, + DruidQuery druidQuery) { + if (e.getKind() != SqlKind.IN && e.getKind() != SqlKind.NOT_IN) { + throw new AssertionError( + DruidQuery.format("Expecting IN or NOT IN but got [%s]", e.getKind())); + } + ImmutableList.Builder<String> listBuilder = ImmutableList.builder(); + for (RexNode rexNode : ((RexCall) e).getOperands()) { + if (rexNode.getKind() == SqlKind.LITERAL) { + String value = toDruidLiteral(rexNode, rowType, druidQuery); + if (value == null) { + return null; + } + listBuilder.add(value); + } + } + Pair<String, ExtractionFunction> druidColumn = DruidQuery + .toDruidColumn(((RexCall) e).getOperands().get(0), + rowType, druidQuery); + final String columnName = druidColumn.left; + final ExtractionFunction extractionFunction = druidColumn.right; + if (columnName == null) { + return null; + } + if (e.getKind() != SqlKind.NOT_IN) { + return new DruidJsonFilter.JsonInFilter(columnName, listBuilder.build(), extractionFunction); + } else { + return toNotDruidFilter( + new DruidJsonFilter.JsonInFilter(columnName, listBuilder.build(), extractionFunction)); + } + } + + @Nullable + protected static DruidJsonFilter toNotDruidFilter(DruidJsonFilter druidJsonFilter) { + if (druidJsonFilter == null) { + return null; + } + return new JsonCompositeFilter(Type.NOT, druidJsonFilter); + } + + @Nullable + private static DruidJsonFilter toBetweenDruidFilter(RexNode rexNode, RelDataType rowType, + DruidQuery query) { + if (rexNode.getKind() != SqlKind.BETWEEN) { + return null; + } + final RexCall rexCall = (RexCall) rexNode; + if (rexCall.getOperands().size() < 4) { + return null; + } + // BETWEEN (ASYMMETRIC, REF, 'lower-bound', 'upper-bound') + final RexNode refNode = rexCall.getOperands().get(1); + final RexNode lhs = rexCall.getOperands().get(2); + final RexNode rhs = rexCall.getOperands().get(3); + + final String lhsLiteralValue = toDruidLiteral(lhs, rowType, query); + final String rhsLiteralValue = toDruidLiteral(rhs, rowType, query); + if (lhsLiteralValue == null || rhsLiteralValue == null) { + return null; + } + final boolean isNumeric = lhs.getType().getFamily() == SqlTypeFamily.NUMERIC + || lhs.getType().getFamily() == SqlTypeFamily.NUMERIC; + final Pair<String, ExtractionFunction> druidColumn = DruidQuery + .toDruidColumn(refNode, rowType, query); + final String columnName = druidColumn.left; + final ExtractionFunction extractionFunction = druidColumn.right; + + if (columnName == null) { + return null; + } + return new JsonBound(columnName, lhsLiteralValue, false, rhsLiteralValue, + false, isNumeric, + extractionFunction); + + } + + @Nullable + private static DruidJsonFilter toSimpleDruidFilter(RexNode e, RelDataType rowType, + DruidQuery druidQuery) { + switch (e.getKind()) { + case EQUALS: + case NOT_EQUALS: + return toEqualityKindDruidFilter(e, rowType, druidQuery); + case GREATER_THAN: + case GREATER_THAN_OR_EQUAL: + case LESS_THAN: + case LESS_THAN_OR_EQUAL: + return toBoundDruidFilter(e, rowType, druidQuery); + case BETWEEN: + return toBetweenDruidFilter(e, rowType, druidQuery); + case IN: + case NOT_IN: + return toInKindDruidFilter(e, rowType, druidQuery); + case IS_NULL: + case IS_NOT_NULL: + return toIsNullKindDruidFilter(e, rowType, druidQuery); + default: + return null; + } + } + + /** + * @param rexNode rexNode to translate to Druid Filter + * @param rowType rowType of filter input + * @param druidQuery Druid query + * + * @return Druid Json Filters or null when can not translate to valid Druid Filters. + */ + @Nullable + static DruidJsonFilter toDruidFilters(final RexNode rexNode, RelDataType rowType, + DruidQuery druidQuery) { + if (rexNode.isAlwaysTrue()) { + return JsonExpressionFilter.alwaysTrue(); + } + if (rexNode.isAlwaysFalse()) { + return JsonExpressionFilter.alwaysFalse(); + } + switch (rexNode.getKind()) { + case IS_TRUE: + case IS_NOT_FALSE: + return toDruidFilters(Iterables.getOnlyElement(((RexCall) rexNode).getOperands()), rowType, + druidQuery); + case IS_NOT_TRUE: + case IS_FALSE: + final DruidJsonFilter simpleFilter = toDruidFilters(Iterables + .getOnlyElement(((RexCall) rexNode).getOperands()), rowType, druidQuery); + return simpleFilter != null ? new JsonCompositeFilter(Type.NOT, simpleFilter) + : simpleFilter; + case AND: + case OR: + case NOT: + final RexCall call = (RexCall) rexNode; + final List<DruidJsonFilter> jsonFilters = Lists.newArrayList(); + for (final RexNode e : call.getOperands()) { + final DruidJsonFilter druidFilter = toDruidFilters(e, rowType, druidQuery); + if (druidFilter == null) { + return null; + } + jsonFilters.add(druidFilter); + } + return new JsonCompositeFilter(Type.valueOf(rexNode.getKind().name()), + jsonFilters); + } + + final DruidJsonFilter simpleLeafFilter = toSimpleDruidFilter(rexNode, rowType, druidQuery); + return simpleLeafFilter == null + ? toDruidExpressionFilter(rexNode, rowType, druidQuery) + : simpleLeafFilter; + } + + @Nullable + private static DruidJsonFilter toDruidExpressionFilter(RexNode rexNode, RelDataType rowType, + DruidQuery query) { + final String expression = DruidExpressions.toDruidExpression(rexNode, rowType, query); + return expression == null ? null : new JsonExpressionFilter(expression); + } + + /** + * Supported filter types + */ + protected enum Type { + AND, + OR, + NOT, + SELECTOR, + IN, + BOUND, + EXPRESSION; + + public String lowercase() { + return name().toLowerCase(Locale.ROOT); + } + } + + protected final Type type; + + private DruidJsonFilter(Type type) { + this.type = type; + } + + /** + * Druid Expression filter. + */ + public static class JsonExpressionFilter extends DruidJsonFilter { + private final String expression; + + JsonExpressionFilter(String expression) { + super(Type.EXPRESSION); + this.expression = Preconditions.checkNotNull(expression); + } + + @Override public void write(JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeStringField("type", type.lowercase()); + generator.writeStringField("expression", expression); + generator.writeEndObject(); + } + + /** + * We need to push to Druid an expression that always evaluates to true. + */ + private static JsonExpressionFilter alwaysTrue() { + return new JsonExpressionFilter("1 == 1"); + } + + /** + * We need to push to Druid an expression that always evaluates to false. + */ + private static JsonExpressionFilter alwaysFalse() { + return new JsonExpressionFilter("1 == 2"); + } + } + + /** + * Equality filter. + */ + private static class JsonSelector extends DruidJsonFilter { + private final String dimension; + + private final String value; + + private final ExtractionFunction extractionFunction; + + private JsonSelector(String dimension, String value, + ExtractionFunction extractionFunction) { + super(Type.SELECTOR); + this.dimension = dimension; + this.value = value; + this.extractionFunction = extractionFunction; + } + + public void write(JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeStringField("type", type.lowercase()); + generator.writeStringField("dimension", dimension); + generator.writeStringField("value", value); + DruidQuery.writeFieldIf(generator, "extractionFn", extractionFunction); + generator.writeEndObject(); + } + } + + /** + * Bound filter. + */ + @VisibleForTesting + protected static class JsonBound extends DruidJsonFilter { + private final String dimension; + + private final String lower; + + private final boolean lowerStrict; + + private final String upper; + + private final boolean upperStrict; + + private final boolean alphaNumeric; + + private final ExtractionFunction extractionFunction; + + protected JsonBound(String dimension, String lower, + boolean lowerStrict, String upper, boolean upperStrict, + boolean alphaNumeric, ExtractionFunction extractionFunction) { + super(Type.BOUND); + this.dimension = dimension; + this.lower = lower; + this.lowerStrict = lowerStrict; + this.upper = upper; + this.upperStrict = upperStrict; + this.alphaNumeric = alphaNumeric; + this.extractionFunction = extractionFunction; + } + + public void write(JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeStringField("type", type.lowercase()); + generator.writeStringField("dimension", dimension); + if (lower != null) { + generator.writeStringField("lower", lower); + generator.writeBooleanField("lowerStrict", lowerStrict); + } + if (upper != null) { + generator.writeStringField("upper", upper); + generator.writeBooleanField("upperStrict", upperStrict); + } + if (alphaNumeric) { + generator.writeStringField("ordering", "numeric"); + } else { + generator.writeStringField("ordering", "lexicographic"); + } + DruidQuery.writeFieldIf(generator, "extractionFn", extractionFunction); + generator.writeEndObject(); + } + } + + /** + * Filter that combines other filters using a boolean operator. + */ + private static class JsonCompositeFilter extends DruidJsonFilter { + private final List<? extends DruidJsonFilter> fields; + + private JsonCompositeFilter(Type type, + Iterable<? extends DruidJsonFilter> fields) { + super(type); + this.fields = ImmutableList.copyOf(fields); + } + + private JsonCompositeFilter(Type type, DruidJsonFilter... fields) { + this(type, ImmutableList.copyOf(fields)); + } + + public void write(JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeStringField("type", type.lowercase()); + switch (type) { + case NOT: + DruidQuery.writeField(generator, "field", fields.get(0)); + break; + default: + DruidQuery.writeField(generator, "fields", fields); + } + generator.writeEndObject(); + } + } + + /** + * IN filter. + */ + protected static class JsonInFilter extends DruidJsonFilter { + private final String dimension; + + private final List<String> values; + + private final ExtractionFunction extractionFunction; + + protected JsonInFilter(String dimension, List<String> values, + ExtractionFunction extractionFunction) { + super(Type.IN); + this.dimension = dimension; + this.values = values; + this.extractionFunction = extractionFunction; + } + + public void write(JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeStringField("type", type.lowercase()); + generator.writeStringField("dimension", dimension); + DruidQuery.writeField(generator, "values", values); + DruidQuery.writeFieldIf(generator, "extractionFn", extractionFunction); + generator.writeEndObject(); + } + } + + public static DruidJsonFilter getSelectorFilter(String column, String value, + ExtractionFunction extractionFunction) { + Preconditions.checkNotNull(column); + return new JsonSelector(column, value, extractionFunction); + } + + /** + * Druid Having Filter spec + */ + protected static class JsonDimHavingFilter implements DruidJson { + + private final DruidJsonFilter filter; + + public JsonDimHavingFilter(DruidJsonFilter filter) { + this.filter = filter; + } + + @Override public void write(JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeStringField("type", "filter"); + DruidQuery.writeField(generator, "filter", filter); + generator.writeEndObject(); + } + } +} + +// End DruidJsonFilter.java
