This is an automated email from the ASF dual-hosted git repository. cgivre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 6cea55defa DRILL-8526: Hive Predicate Push Down for ORC and Parquet (#2995) 6cea55defa is described below commit 6cea55defad03d158e1f958abc65fdbe9d970550 Author: shfshihuafeng <shfshihuaf...@163.com> AuthorDate: Thu Aug 7 23:09:40 2025 +0800 DRILL-8526: Hive Predicate Push Down for ORC and Parquet (#2995) --- .../org/apache/drill/exec/store/hive/HiveScan.java | 35 ++- .../drill/exec/store/hive/HiveStoragePlugin.java | 3 + .../filter/HiveCompareFunctionsProcessor.java | 266 +++++++++++++++++++++ .../exec/store/hive/readers/filter/HiveFilter.java | 71 ++++++ .../hive/readers/filter/HiveFilterBuilder.java | 212 ++++++++++++++++ .../readers/filter/HivePushFilterIntoScan.java | 171 +++++++++++++ .../apache/drill/exec/TestHiveFilterPushDown.java | 124 ++++++++++ .../exec/store/hive/HiveTestDataGenerator.java | 13 + 8 files changed, 886 insertions(+), 9 deletions(-) diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java index f26f68eb25..9ec05d9136 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.drill.common.PlanStringBuilder; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -39,11 +40,13 @@ import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.hive.HiveMetadataProvider.HiveStats; import org.apache.drill.exec.store.hive.HiveMetadataProvider.LogicalInputSplit; import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper; +import org.apache.drill.exec.store.hive.readers.filter.HiveFilter; import org.apache.drill.exec.util.Utilities; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +57,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import static org.apache.drill.exec.store.hive.HiveUtilities.createPartitionWithSpecColumns; +import static org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg.SARG_PUSHDOWN; @JsonTypeName("hive-scan") public class HiveScan extends AbstractGroupScan { @@ -70,7 +74,7 @@ public class HiveScan extends AbstractGroupScan { private List<LogicalInputSplit> inputSplits; protected List<SchemaPath> columns; - + private boolean filterPushedDown = false; @JsonCreator public HiveScan(@JsonProperty("userName") final String userName, @JsonProperty("hiveReadEntry") final HiveReadEntry hiveReadEntry, @@ -171,6 +175,16 @@ public class HiveScan extends AbstractGroupScan { return !(partitionKeys == null || partitionKeys.size() == 0); } + @JsonIgnore + public void setFilterPushedDown(boolean isPushedDown) { + this.filterPushedDown = isPushedDown; + } + + @JsonIgnore + public boolean isFilterPushedDown() { + return filterPushedDown; + } + @Override public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) { mappings = new ArrayList<>(); @@ -295,14 +309,17 @@ public class HiveScan extends AbstractGroupScan { public String toString() { List<HivePartitionWrapper> partitions = hiveReadEntry.getHivePartitionWrappers(); int numPartitions = partitions == null ? 0 : partitions.size(); - return "HiveScan [table=" + hiveReadEntry.getHiveTableWrapper() - + ", columns=" + columns - + ", numPartitions=" + numPartitions - + ", partitions= " + partitions - + ", inputDirectories=" + metadataProvider.getInputDirectories(hiveReadEntry) - + ", confProperties=" + confProperties - + ", maxRecords=" + maxRecords - + "]"; + String SearchArgumentString = confProperties.get(SARG_PUSHDOWN); + SearchArgument searchArgument = SearchArgumentString == null ? null : HiveFilter.create(SearchArgumentString); + + return new PlanStringBuilder(this) + .field("table", hiveReadEntry.getHiveTableWrapper()) + .field("columns", columns) + .field("numPartitions", numPartitions) + .field("inputDirectories", metadataProvider.getInputDirectories(hiveReadEntry)) + .field("confProperties", confProperties) + .field("SearchArgument", searchArgument) + .toString(); } @Override diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java index a85159fe1e..f18d005e7f 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java @@ -48,6 +48,7 @@ import org.apache.drill.exec.server.options.SessionOptionManager; import org.apache.drill.exec.store.AbstractStoragePlugin; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.drill.exec.store.hive.readers.filter.HivePushFilterIntoScan; import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory; import com.google.common.collect.ImmutableSet; @@ -200,6 +201,8 @@ public class HiveStoragePlugin extends AbstractStoragePlugin { options.getBoolean(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER)) { ruleBuilder.add(ConvertHiveParquetScanToDrillParquetScan.INSTANCE); } + ruleBuilder.add(HivePushFilterIntoScan.FILTER_ON_PROJECT); + ruleBuilder.add(HivePushFilterIntoScan.FILTER_ON_SCAN); return ruleBuilder.build(); } default: diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveCompareFunctionsProcessor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveCompareFunctionsProcessor.java new file mode 100644 index 0000000000..6a76e7aca4 --- /dev/null +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveCompareFunctionsProcessor.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive.readers.filter; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.drill.common.FunctionNames; +import org.apache.drill.common.expression.CastExpression; +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.common.expression.ValueExpressions.BooleanExpression; +import org.apache.drill.common.expression.ValueExpressions.DateExpression; +import org.apache.drill.common.expression.ValueExpressions.DoubleExpression; +import org.apache.drill.common.expression.ValueExpressions.FloatExpression; +import org.apache.drill.common.expression.ValueExpressions.IntExpression; +import org.apache.drill.common.expression.ValueExpressions.LongExpression; +import org.apache.drill.common.expression.ValueExpressions.QuotedString; +import org.apache.drill.common.expression.ValueExpressions.TimeExpression; +import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression; +import org.apache.drill.common.expression.visitors.AbstractExprVisitor; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; + +import java.sql.Timestamp; + +public class HiveCompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> { + + private static final ImmutableSet<String> IS_FUNCTIONS_SET; + + private Object value; + private PredicateLeaf.Type valueType; + private boolean success; + private SchemaPath path; + private String functionName; + + public HiveCompareFunctionsProcessor(String functionName) { + this.success = false; + this.functionName = functionName; + } + + static { + ImmutableSet.Builder<String> builder = ImmutableSet.builder(); + IS_FUNCTIONS_SET = builder + .add(FunctionNames.IS_NOT_NULL) + .add("isNotNull") + .add("is not null") + .add(FunctionNames.IS_NULL) + .add("isNull") + .add("is null") + .add(FunctionNames.IS_TRUE) + .add(FunctionNames.IS_NOT_TRUE) + .add(FunctionNames.IS_FALSE) + .add(FunctionNames.IS_NOT_FALSE) + .build(); + + } + + private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP; + static { + ImmutableMap.Builder<String, String> builder = ImmutableMap.builder(); + COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder + // binary functions + .put(FunctionNames.LIKE, FunctionNames.LIKE) + .put(FunctionNames.EQ, FunctionNames.EQ) + .put(FunctionNames.NE, FunctionNames.NE) + .put(FunctionNames.GE, FunctionNames.LE) + .put(FunctionNames.GT, FunctionNames.LT) + .put(FunctionNames.LE, FunctionNames.GE) + .put(FunctionNames.LT, FunctionNames.GT) + .build(); + } + + private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES; + static { + ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder(); + VALUE_EXPRESSION_CLASSES = builder + .add(BooleanExpression.class) + .add(DateExpression.class) + .add(DoubleExpression.class) + .add(FloatExpression.class) + .add(IntExpression.class) + .add(LongExpression.class) + .add(QuotedString.class) + .add(TimeExpression.class) + .build(); + } + + public static boolean isCompareFunction(final String functionName) { + return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName); + } + + public static boolean isIsFunction(final String funcName) { + return IS_FUNCTIONS_SET.contains(funcName); + } + + // shows whether function is simplified IS FALSE + public static boolean isNot(final FunctionCall call, final String funcName) { + return !call.args().isEmpty() + && FunctionNames.NOT.equals(funcName); + } + + public static HiveCompareFunctionsProcessor createFunctionsProcessorInstance(final FunctionCall call) { + String functionName = call.getName(); + HiveCompareFunctionsProcessor evaluator = new HiveCompareFunctionsProcessor(functionName); + + return createFunctionsProcessorInstanceInternal(call, evaluator); + } + + protected static <T extends HiveCompareFunctionsProcessor> T createFunctionsProcessorInstanceInternal(FunctionCall call, T evaluator) { + LogicalExpression nameArg = call.arg(0); + LogicalExpression valueArg = call.argCount() >= 2 ? call.arg(1) : null; + if (valueArg != null) { // binary function + if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) { + LogicalExpression swapArg = valueArg; + valueArg = nameArg; + nameArg = swapArg; + evaluator.setFunctionName(COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(evaluator.getFunctionName())); + } + evaluator.setSuccess(nameArg.accept(evaluator, valueArg)); + } else if (call.arg(0) instanceof SchemaPath) { + evaluator.setPath((SchemaPath) nameArg); + } + evaluator.setSuccess(true); + return evaluator; + } + + public boolean isSuccess() { + return success; + } + + protected void setSuccess(boolean success) { + this.success = success; + } + + public SchemaPath getPath() { + return path; + } + + protected void setPath(SchemaPath path) { + this.path = path; + } + + public String getFunctionName() { + return functionName; + } + + protected void setFunctionName(String functionName) { + this.functionName = functionName; + } + + public Object getValue() { + return value; + } + + public void setValue(Object value) { + this.value = value; + } + + public PredicateLeaf.Type getValueType() { + return valueType; + } + + public void setValueType(PredicateLeaf.Type valueType) { + this.valueType = valueType; + } + + @Override + public Boolean visitCastExpression(CastExpression e, LogicalExpression valueArg) throws RuntimeException { + if (e.getInput() instanceof CastExpression || e.getInput() instanceof SchemaPath) { + return e.getInput().accept(this, valueArg); + } + return false; + } + + @Override + public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException { + return false; + } + + @Override + public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException { + if (valueArg instanceof QuotedString) { + this.value = ((QuotedString) valueArg).getString(); + this.path = path; + this.valueType = PredicateLeaf.Type.STRING; + return true; + } + + if (valueArg instanceof IntExpression) { + int expValue = ((IntExpression) valueArg).getInt(); + this.value = ((Integer) expValue).longValue(); + this.path = path; + this.valueType = PredicateLeaf.Type.LONG; + return true; + } + + if (valueArg instanceof LongExpression) { + this.value = ((LongExpression) valueArg).getLong(); + this.path = path; + this.valueType = PredicateLeaf.Type.LONG; + return true; + } + + if (valueArg instanceof FloatExpression) { + this.value = ((FloatExpression) valueArg).getFloat(); + this.path = path; + this.valueType = PredicateLeaf.Type.FLOAT; + return true; + } + + if (valueArg instanceof DoubleExpression) { + this.value = ((DoubleExpression) valueArg).getDouble(); + this.path = path; + this.valueType = PredicateLeaf.Type.FLOAT; + return true; + } + + if (valueArg instanceof BooleanExpression) { + this.value = ((BooleanExpression) valueArg).getBoolean(); + this.path = path; + this.valueType = PredicateLeaf.Type.BOOLEAN; + return true; + } + + if (valueArg instanceof DateExpression) { + this.value = ((DateExpression) valueArg).getDate(); + this.path = path; + this.valueType = PredicateLeaf.Type.LONG; + return true; + } + + if (valueArg instanceof TimeStampExpression) { + long timeStamp = ((TimeStampExpression) valueArg).getTimeStamp(); + this.value = new Timestamp(timeStamp); + this.path = path; + this.valueType = PredicateLeaf.Type.TIMESTAMP; + return true; + } + + if (valueArg instanceof ValueExpressions.VarDecimalExpression) { + double v = ((ValueExpressions.VarDecimalExpression) valueArg).getBigDecimal().doubleValue(); + this.value = new HiveDecimalWritable(String.valueOf(v)); + this.path = path; + this.valueType = PredicateLeaf.Type.DECIMAL; + return true; + } + return false; + } +} diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilter.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilter.java new file mode 100644 index 0000000000..46d973696b --- /dev/null +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilter.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive.readers.filter; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl; +import org.apache.hive.com.esotericsoftware.kryo.Kryo; +import org.apache.hive.com.esotericsoftware.kryo.io.Input; +import org.apache.hive.com.esotericsoftware.kryo.io.Output; +/** + * Primary interface for <a href="http://en.wikipedia.org/wiki/Sargable"> + * SearchArgument</a>, which are the subset of predicates + * that can be pushed down to the RecordReader. Each SearchArgument consists + * of a series of SearchClauses that must each be true for the row to be + * accepted by the filter. + * + * This requires that the filter be normalized into conjunctive normal form + * (<a href="http://en.wikipedia.org/wiki/Conjunctive_normal_form">CNF</a>). + */ +public class HiveFilter { + + private final SearchArgument searchArgument; + private final static ThreadLocal<Kryo> kryo = new ThreadLocal<Kryo>() { + protected Kryo initialValue() { return new Kryo(); } + }; + + public HiveFilter(SearchArgument searchArgument) { + this.searchArgument = searchArgument; + } + + private static String toKryo(SearchArgument sarg) { + Output out = new Output(4 * 1024, 10 * 1024 * 1024); + new Kryo().writeObject(out, sarg); + out.close(); + return Base64.encodeBase64String(out.toBytes()); + } + + @VisibleForTesting + public static SearchArgument create(String kryo) { + return create(Base64.decodeBase64(kryo)); + } + + private static SearchArgument create(byte[] kryoBytes) { + return kryo.get().readObject(new Input(kryoBytes), SearchArgumentImpl.class); + } + + public SearchArgument getSearchArgument() { + return searchArgument; + } + + public String getSearchArgumentString() { + return toKryo(searchArgument); + } +} diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilterBuilder.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilterBuilder.java new file mode 100644 index 0000000000..92c23b67cc --- /dev/null +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilterBuilder.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive.readers.filter; + +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.drill.common.FunctionNames; +import org.apache.drill.common.expression.BooleanOperator; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.visitors.AbstractExprVisitor; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; + +public class HiveFilterBuilder extends AbstractExprVisitor<SearchArgument.Builder, Void, RuntimeException> { + + private static final Logger logger = LoggerFactory.getLogger(HiveFilterBuilder.class); + + private final LogicalExpression le; + private final SearchArgument.Builder builder; + private final HashMap<String, SqlTypeName> dataTypeMap; + + private boolean allExpressionsConverted = false; + + HiveFilterBuilder(final LogicalExpression le, final HashMap<String, SqlTypeName> dataTypeMap) { + this.le = le; + this.dataTypeMap = dataTypeMap; + this.builder = SearchArgumentFactory.newBuilder(); + } + + public HiveFilter parseTree() { + SearchArgument.Builder accept = le.accept(this, null); + SearchArgument searchArgument = builder.build(); + if (accept != null) { + searchArgument = accept.build(); + } + return new HiveFilter(searchArgument); + } + + public boolean isAllExpressionsConverted() { + return allExpressionsConverted; + } + + @Override + public SearchArgument.Builder visitUnknown(LogicalExpression e, Void value) throws RuntimeException { + allExpressionsConverted = false; + return null; + } + + @Override + public SearchArgument.Builder visitSchemaPath(SchemaPath path, Void value) throws RuntimeException { + if (path instanceof FieldReference) { + String fieldName = path.getAsNamePart().getName(); + SqlTypeName sqlTypeName = dataTypeMap.get(fieldName); + switch (sqlTypeName) { + case BOOLEAN: + PredicateLeaf.Type valueType = convertLeafType(sqlTypeName); + builder.startNot().equals(fieldName, valueType, false).end(); + break; + default: + // otherwise, we don't know what to do so make it a maybe or do nothing + builder.literal(SearchArgument.TruthValue.YES_NO_NULL); + } + } + return builder; + } + + @Override + public SearchArgument.Builder visitBooleanOperator(BooleanOperator op, Void value) throws RuntimeException { + return visitFunctionCall(op, value); + } + + @Override + public SearchArgument.Builder visitFunctionCall(FunctionCall call, Void value) throws RuntimeException { + String functionName = call.getName(); + List<LogicalExpression> args = call.args(); + if (HiveCompareFunctionsProcessor.isCompareFunction(functionName) || HiveCompareFunctionsProcessor.isIsFunction(functionName) || HiveCompareFunctionsProcessor.isNot(call, functionName)) { + HiveCompareFunctionsProcessor processor = HiveCompareFunctionsProcessor.createFunctionsProcessorInstance(call); + if (processor.isSuccess()) { + return buildSearchArgument(processor); + } + } else { + switch (functionName) { + case FunctionNames.AND: + builder.startAnd(); + break; + case FunctionNames.OR: + builder.startOr(); + break; + default: + logger.warn("Unsupported logical operator:{} for push down", functionName); + return builder; + } + for (LogicalExpression arg : args) { + arg.accept(this, null); + } + builder.end(); + } + return builder; + } + + private SearchArgument.Builder buildSearchArgument(final HiveCompareFunctionsProcessor processor) { + String functionName = processor.getFunctionName(); + SchemaPath field = processor.getPath(); + Object fieldValue = processor.getValue(); + PredicateLeaf.Type valueType = processor.getValueType(); + SqlTypeName sqlTypeName = dataTypeMap.get(field.getAsNamePart().getName()); + if (fieldValue == null) { + valueType = convertLeafType(sqlTypeName); + } + if (valueType == null) { + return builder; + } + switch (functionName) { + case FunctionNames.EQ: + builder.startAnd().equals(field.getAsNamePart().getName(), valueType, fieldValue).end(); + break; + case FunctionNames.NE: + builder.startNot().equals(field.getAsNamePart().getName(), valueType, fieldValue).end(); + break; + case FunctionNames.GE: + builder.startNot().lessThan(field.getAsNamePart().getName(), valueType, fieldValue).end(); + break; + case FunctionNames.GT: + builder.startNot().lessThanEquals(field.getAsNamePart().getName(), valueType, fieldValue).end(); + break; + case FunctionNames.LE: + builder.startAnd().lessThanEquals(field.getAsNamePart().getName(), valueType, fieldValue).end(); + break; + case FunctionNames.LT: + builder.startAnd().lessThan(field.getAsNamePart().getName(), valueType, fieldValue).end(); + break; + case FunctionNames.IS_NULL: + case "isNull": + case "is null": + builder.startAnd().isNull(field.getAsNamePart().getName(), valueType).end(); + break; + case FunctionNames.IS_NOT_NULL: + case "isNotNull": + case "is not null": + builder.startNot().isNull(field.getAsNamePart().getName(), valueType).end(); + break; + case FunctionNames.IS_NOT_TRUE: + case FunctionNames.IS_FALSE: + case FunctionNames.NOT: + builder.startNot().equals(field.getAsNamePart().getName(), valueType, true).end(); + break; + case FunctionNames.IS_TRUE: + case FunctionNames.IS_NOT_FALSE: + builder.startNot().equals(field.getAsNamePart().getName(), valueType, false).end(); + break; + } + return builder; + } + + private PredicateLeaf.Type convertLeafType(SqlTypeName sqlTypeName) { + PredicateLeaf.Type type = null; + switch (sqlTypeName) { + case BOOLEAN: + type = PredicateLeaf.Type.BOOLEAN; + break; + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + type = PredicateLeaf.Type.LONG; + break; + case DOUBLE: + case FLOAT: + type = PredicateLeaf.Type.FLOAT; + break; + case DECIMAL: + type = PredicateLeaf.Type.DECIMAL; + break; + case DATE: + type = PredicateLeaf.Type.DATE; + break; + case TIMESTAMP: + type = PredicateLeaf.Type.TIMESTAMP; + break; + case CHAR: + case VARCHAR: + type = PredicateLeaf.Type.STRING; + break; + default: + logger.warn("Not support push down type:" + sqlTypeName.getName()); + } + return type; + } +} diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HivePushFilterIntoScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HivePushFilterIntoScan.java new file mode 100644 index 0000000000..f372521ec7 --- /dev/null +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HivePushFilterIntoScan.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive.readers.filter; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.planner.logical.DrillOptiq; +import org.apache.drill.exec.planner.logical.DrillParseContext; +import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.physical.FilterPrel; +import org.apache.drill.exec.planner.physical.PrelUtil; +import org.apache.drill.exec.planner.physical.ProjectPrel; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.drill.exec.store.hive.HiveScan; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg.SARG_PUSHDOWN; + +public abstract class HivePushFilterIntoScan extends StoragePluginOptimizerRule { + + private HivePushFilterIntoScan(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + public static final StoragePluginOptimizerRule FILTER_ON_SCAN = + new HivePushFilterIntoScan(RelOptHelper.some(FilterPrel.class, + RelOptHelper.any(ScanPrel.class)), "HivePushFilterIntoScan:Filter_On_Scan") { + + @Override + public void onMatch(RelOptRuleCall call) { + final ScanPrel scan = call.rel(1); + final FilterPrel filter = call.rel(0); + final RexNode condition = filter.getCondition(); + + HiveScan groupScan = (HiveScan) scan.getGroupScan(); + if (groupScan.isFilterPushedDown()) { + /* + * The rule can get triggered again due to the transformed "scan => filter" sequence + * created by the earlier execution of this rule when we could not do a complete + * conversion of Optiq Filter's condition to Hive Filter. In such cases, we rely upon + * this flag to not do a re-processing of the rule on the already transformed call. + */ + return; + } + + doPushFilterToScan(call, filter, null, scan, groupScan, condition); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final ScanPrel scan = call.rel(1); + if (scan.getGroupScan() instanceof HiveScan) { + return super.matches(call); + } + return false; + } + }; + + public static final StoragePluginOptimizerRule FILTER_ON_PROJECT = + new HivePushFilterIntoScan(RelOptHelper.some(FilterPrel.class, + RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class))), + "HivePushFilterIntoScan:Filter_On_Project") { + + @Override + public void onMatch(RelOptRuleCall call) { + final ScanPrel scan = call.rel(2); + final ProjectPrel project = call.rel(1); + final FilterPrel filter = call.rel(0); + + HiveScan groupScan = (HiveScan) scan.getGroupScan(); + if (groupScan.isFilterPushedDown()) { + /* + * The rule can get triggered again due to the transformed "scan => filter" sequence + * created by the earlier execution of this rule when we could not do a complete + * conversion of Optiq Filter's condition to Hive Filter. In such cases, we rely upon + * this flag to not do a re-processing of the rule on the already transformed call. + */ + return; + } + + // convert the filter to one that references the child of the project + final RexNode condition = RelOptUtil.pushPastProject(filter.getCondition(), project); + + doPushFilterToScan(call, filter, project, scan, groupScan, condition); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final ScanPrel scan = call.rel(2); + if (scan.getGroupScan() instanceof HiveScan) { + return super.matches(call); + } + return false; + } + }; + + + protected void doPushFilterToScan(final RelOptRuleCall call, final FilterPrel filter, + final ProjectPrel project, final ScanPrel scan, final HiveScan groupScan, + final RexNode condition) { + + HashMap<String, SqlTypeName> dataTypeMap = new HashMap<>(); + List<RelDataTypeField> fieldList = scan.getRowType().getFieldList(); + for (RelDataTypeField relDataTypeField : fieldList) { + String name = relDataTypeField.getName(); + SqlTypeName sqlTypeName = relDataTypeField.getValue().getSqlTypeName(); + dataTypeMap.put(name, sqlTypeName); + } + + final LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), + scan, condition); + final HiveFilterBuilder orcFilterBuilder = new HiveFilterBuilder(conditionExp, dataTypeMap); + final HiveFilter newScanSpec = orcFilterBuilder.parseTree(); + + if (newScanSpec == null) { + return; //no filter pushdown ==> No transformation. + } + + String searchArgument = newScanSpec.getSearchArgumentString(); + Map<String, String> confProperties = groupScan.getConfProperties(); + confProperties.put(SARG_PUSHDOWN, searchArgument); + HiveScan newGroupsScan = null; + try { + newGroupsScan = new HiveScan(groupScan.getUserName(), groupScan.getHiveReadEntry(), + groupScan.getStoragePlugin(), groupScan.getColumns(), null, confProperties, groupScan.getMaxRecords()); + } catch (ExecutionSetupException e) { + //Why does constructor method HiveScan throw exception? + e.printStackTrace(); + throw new RuntimeException(e); + } + newGroupsScan.setFilterPushedDown(true); + + final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), filter.getTraitSet(), + newGroupsScan, scan.getRowType(), scan.getTable()); + // Depending on whether is a project in the middle, assign either scan or copy of project to + // childRel. + final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), + ImmutableList.of(newScanPrel)); + /* + * we could not convert the entire filter condition expression into an Hive orc filter. + */ + call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel))); + } +} diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveFilterPushDown.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveFilterPushDown.java new file mode 100644 index 0000000000..b821ae5a3b --- /dev/null +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveFilterPushDown.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec; + +import org.apache.drill.categories.HiveStorageTest; +import org.apache.drill.categories.SlowTest; +import org.apache.drill.exec.hive.HiveTestBase; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; + +@Category({SlowTest.class, HiveStorageTest.class}) +public class TestHiveFilterPushDown extends HiveTestBase { + + @BeforeClass + public static void init() { + //set false for test parquet push down + setSessionOption(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER, false); + setSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true); + } + + @AfterClass + public static void cleanup() { + resetSessionOption(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER); + resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY); + } + + @Test + public void testPushDownWithEqualForOrc() throws Exception { + String query = "select * from hive.orc_push_down where key = 1"; + + int actualRowCount = testSql(query); + assertEquals("Expected and actual row count should match", 4, actualRowCount); + + testPlanMatchingPatterns(query, + new String[]{"Filter\\(condition=\\[=\\($0, 1\\)\\]\\)", + "SearchArgument=leaf-0 = \\(EQUALS key 1\\), expr = leaf-0"}, + new String[]{}); + } + + @Test + public void testPushDownWithNotEqualForOrc() throws Exception { + String query = "select * from hive.orc_push_down where key <> 1"; + + int actualRowCount = testSql(query); + assertEquals("Expected and actual row count should match", 2, actualRowCount); + + testPlanMatchingPatterns(query, + new String[]{"Filter\\(condition=\\[=\\<\\>\\($0, 1\\)\\]\\)", + "SearchArgument=leaf-0 = \\(EQUALS key 1\\), expr = \\(not leaf-0\\)"}, + new String[]{}); + } + + @Test + public void testPushDownWithGreaterThanForOrc() throws Exception { + String query = "select * from hive.orc_push_down where key > 1"; + + int actualRowCount = testSql(query); + assertEquals("Expected and actual row count should match", 2, actualRowCount); + + testPlanMatchingPatterns(query, + new String[]{"Filter\\(condition=\\[=\\>\\($0, 1\\)\\]\\)", + "SearchArgument=leaf-0 = \\(LESS_THAN_EQUALS key 1\\), expr = \\(not leaf-0\\)"}, + new String[]{}); + } + + @Test + public void testPushDownWithLessThanForOrc() throws Exception { + String query = "select * from hive.orc_push_down where key < 2"; + + int actualRowCount = testSql(query); + assertEquals("Expected and actual row count should match", 4, actualRowCount); + + testPlanMatchingPatterns(query, + new String[]{"Filter\\(condition=\\[=\\<\\($0, 2\\)\\]\\)", + "SearchArgument=leaf-0 = \\(LESS_THAN key 2\\), expr = leaf-0"}, + new String[]{}); + } + + @Test + public void testPushDownWithAndForOrc() throws Exception { + String query = "select * from hive.orc_push_down where key = 2 and var_key = 'var_6'"; + + int actualRowCount = testSql(query); + assertEquals("Expected and actual row count should match", 1, actualRowCount); + + testPlanMatchingPatterns(query, + new String[]{"Filter\\(condition=\\[AND\\(=\\($0, 2\\), =\\($2, 'var_6'\\)\\)\\]\\)", + "SearchArgument=leaf-0 = \\(EQUALS key 2\\), leaf-1 = \\(EQUALS var_key var_6\\), expr = \\(and leaf-0 leaf-1\\)"}, + new String[]{}); + } + + @Test + public void testPushDownWithOrForOrc() throws Exception { + String query = "select * from hive.orc_push_down where key = 2 and var_key = 'var_1'"; + + int actualRowCount = testSql(query); + assertEquals("Expected and actual row count should match", 3, actualRowCount); + + testPlanMatchingPatterns(query, + new String[]{"Filter\\(condition=\\[OR\\(=\\($0, 2\\), =\\($2, 'var_1'\\)\\)\\]\\)", + "SearchArgument=leaf-0 = \\(EQUALS key 2\\), leaf-1 = \\(EQUALS var_key var_1\\), expr = \\(or leaf-0 leaf-1\\)"}, + new String[]{}); + } +} diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java index 106fb22963..0a8b7bc3d9 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java @@ -454,8 +454,21 @@ public class HiveTestDataGenerator { "INNER JOIN kv ON kv.key = dk.key"); createTableWithEmptyParquet(hiveDriver); + createTestDataForDrillORCPushDownReaderTests(hiveDriver); } + private void createTestDataForDrillORCPushDownReaderTests(Driver hiveDriver) { + // Hive managed table that has data qualified for Drill orc filter push down + executeQuery(hiveDriver, "create table orc_push_down(key int, int_key int, var_key varchar(10), dec_key decimal(5, 2), boolean_key boolean) stored as orc"); + // each insert is created in separate file + executeQuery(hiveDriver, "insert into table orc_push_down values (1, 1, 'var_1', 1.11,true), (1, 2, 'var_2', 2.22,false)"); + executeQuery(hiveDriver, "insert into table orc_push_down values (1, 3, 'var_3', 3.33,true), (1, 4, 'var_4', 4.44,true)"); + executeQuery(hiveDriver, "insert into table orc_push_down values (2, 5, 'var_5', 5.55,false), (2, 6, 'var_6', 6.66,true)"); + executeQuery(hiveDriver, "insert into table orc_push_down values (null, 7, 'var_7', 7.77,false), (null, 8, 'var_8', 8.88,false)"); + } + + + private void createTestDataForDrillNativeParquetReaderTests(Driver hiveDriver) { // Hive managed table that has data qualified for Drill native filter push down executeQuery(hiveDriver, "create table kv_native(key int, int_key int, var_key varchar(10), dec_key decimal(5, 2)) stored as parquet");