This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cea55defa DRILL-8526: Hive Predicate Push Down for ORC and Parquet 
(#2995)
6cea55defa is described below

commit 6cea55defad03d158e1f958abc65fdbe9d970550
Author: shfshihuafeng <shfshihuaf...@163.com>
AuthorDate: Thu Aug 7 23:09:40 2025 +0800

    DRILL-8526: Hive Predicate Push Down for ORC and Parquet (#2995)
---
 .../org/apache/drill/exec/store/hive/HiveScan.java |  35 ++-
 .../drill/exec/store/hive/HiveStoragePlugin.java   |   3 +
 .../filter/HiveCompareFunctionsProcessor.java      | 266 +++++++++++++++++++++
 .../exec/store/hive/readers/filter/HiveFilter.java |  71 ++++++
 .../hive/readers/filter/HiveFilterBuilder.java     | 212 ++++++++++++++++
 .../readers/filter/HivePushFilterIntoScan.java     | 171 +++++++++++++
 .../apache/drill/exec/TestHiveFilterPushDown.java  | 124 ++++++++++
 .../exec/store/hive/HiveTestDataGenerator.java     |  13 +
 8 files changed, 886 insertions(+), 9 deletions(-)

diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index f26f68eb25..9ec05d9136 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -39,11 +40,13 @@ import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.hive.HiveMetadataProvider.HiveStats;
 import org.apache.drill.exec.store.hive.HiveMetadataProvider.LogicalInputSplit;
 import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
+import org.apache.drill.exec.store.hive.readers.filter.HiveFilter;
 import org.apache.drill.exec.util.Utilities;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +57,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 import static 
org.apache.drill.exec.store.hive.HiveUtilities.createPartitionWithSpecColumns;
+import static 
org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg.SARG_PUSHDOWN;
 
 @JsonTypeName("hive-scan")
 public class HiveScan extends AbstractGroupScan {
@@ -70,7 +74,7 @@ public class HiveScan extends AbstractGroupScan {
   private List<LogicalInputSplit> inputSplits;
 
   protected List<SchemaPath> columns;
-
+  private boolean filterPushedDown = false;
   @JsonCreator
   public HiveScan(@JsonProperty("userName") final String userName,
                   @JsonProperty("hiveReadEntry") final HiveReadEntry 
hiveReadEntry,
@@ -171,6 +175,16 @@ public class HiveScan extends AbstractGroupScan {
     return !(partitionKeys == null || partitionKeys.size() == 0);
   }
 
+  @JsonIgnore
+  public void setFilterPushedDown(boolean isPushedDown) {
+    this.filterPushedDown = isPushedDown;
+  }
+
+  @JsonIgnore
+  public boolean isFilterPushedDown() {
+    return filterPushedDown;
+  }
+
   @Override
   public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> 
endpoints) {
     mappings = new ArrayList<>();
@@ -295,14 +309,17 @@ public class HiveScan extends AbstractGroupScan {
   public String toString() {
     List<HivePartitionWrapper> partitions = 
hiveReadEntry.getHivePartitionWrappers();
     int numPartitions = partitions == null ? 0 : partitions.size();
-    return "HiveScan [table=" + hiveReadEntry.getHiveTableWrapper()
-        + ", columns=" + columns
-        + ", numPartitions=" + numPartitions
-        + ", partitions= " + partitions
-        + ", inputDirectories=" + 
metadataProvider.getInputDirectories(hiveReadEntry)
-        + ", confProperties=" + confProperties
-        + ", maxRecords=" + maxRecords
-        + "]";
+    String SearchArgumentString = confProperties.get(SARG_PUSHDOWN);
+    SearchArgument searchArgument = SearchArgumentString == null ? null : 
HiveFilter.create(SearchArgumentString);
+
+    return new PlanStringBuilder(this)
+            .field("table", hiveReadEntry.getHiveTableWrapper())
+            .field("columns", columns)
+            .field("numPartitions", numPartitions)
+            .field("inputDirectories", 
metadataProvider.getInputDirectories(hiveReadEntry))
+            .field("confProperties", confProperties)
+            .field("SearchArgument", searchArgument)
+            .toString();
   }
 
   @Override
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index a85159fe1e..f18d005e7f 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -48,6 +48,7 @@ import 
org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.hive.readers.filter.HivePushFilterIntoScan;
 import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory;
 import com.google.common.collect.ImmutableSet;
 
@@ -200,6 +201,8 @@ public class HiveStoragePlugin extends 
AbstractStoragePlugin {
             
options.getBoolean(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER))
 {
           ruleBuilder.add(ConvertHiveParquetScanToDrillParquetScan.INSTANCE);
         }
+        ruleBuilder.add(HivePushFilterIntoScan.FILTER_ON_PROJECT);
+        ruleBuilder.add(HivePushFilterIntoScan.FILTER_ON_SCAN);
         return ruleBuilder.build();
       }
       default:
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveCompareFunctionsProcessor.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveCompareFunctionsProcessor.java
new file mode 100644
index 0000000000..6a76e7aca4
--- /dev/null
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveCompareFunctionsProcessor.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.filter;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.CastExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DateExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import java.sql.Timestamp;
+
+public class HiveCompareFunctionsProcessor extends 
AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
+
+  private static final ImmutableSet<String> IS_FUNCTIONS_SET;
+
+  private Object value;
+  private PredicateLeaf.Type valueType;
+  private boolean success;
+  private SchemaPath path;
+  private String functionName;
+
+  public HiveCompareFunctionsProcessor(String functionName) {
+    this.success = false;
+    this.functionName = functionName;
+  }
+
+  static {
+    ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+    IS_FUNCTIONS_SET = builder
+        .add(FunctionNames.IS_NOT_NULL)
+        .add("isNotNull")
+        .add("is not null")
+        .add(FunctionNames.IS_NULL)
+        .add("isNull")
+        .add("is null")
+        .add(FunctionNames.IS_TRUE)
+        .add(FunctionNames.IS_NOT_TRUE)
+        .add(FunctionNames.IS_FALSE)
+        .add(FunctionNames.IS_NOT_FALSE)
+        .build();
+
+  }
+
+  private static final ImmutableMap<String, String> 
COMPARE_FUNCTIONS_TRANSPOSE_MAP;
+  static {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
+        // binary functions
+        .put(FunctionNames.LIKE, FunctionNames.LIKE)
+        .put(FunctionNames.EQ, FunctionNames.EQ)
+        .put(FunctionNames.NE, FunctionNames.NE)
+        .put(FunctionNames.GE, FunctionNames.LE)
+        .put(FunctionNames.GT, FunctionNames.LT)
+        .put(FunctionNames.LE, FunctionNames.GE)
+        .put(FunctionNames.LT, FunctionNames.GT)
+        .build();
+  }
+
+  private static final ImmutableSet<Class<? extends LogicalExpression>> 
VALUE_EXPRESSION_CLASSES;
+  static {
+    ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = 
ImmutableSet.builder();
+    VALUE_EXPRESSION_CLASSES = builder
+        .add(BooleanExpression.class)
+        .add(DateExpression.class)
+        .add(DoubleExpression.class)
+        .add(FloatExpression.class)
+        .add(IntExpression.class)
+        .add(LongExpression.class)
+        .add(QuotedString.class)
+        .add(TimeExpression.class)
+        .build();
+  }
+
+  public static boolean isCompareFunction(final String functionName) {
+    return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
+  }
+
+  public static boolean isIsFunction(final String funcName) {
+    return IS_FUNCTIONS_SET.contains(funcName);
+  }
+
+  // shows whether function is simplified IS FALSE
+  public static boolean isNot(final FunctionCall call, final String funcName) {
+    return !call.args().isEmpty()
+        && FunctionNames.NOT.equals(funcName);
+  }
+
+  public static HiveCompareFunctionsProcessor 
createFunctionsProcessorInstance(final FunctionCall call) {
+    String functionName = call.getName();
+    HiveCompareFunctionsProcessor evaluator = new 
HiveCompareFunctionsProcessor(functionName);
+
+    return createFunctionsProcessorInstanceInternal(call, evaluator);
+  }
+
+  protected static <T extends HiveCompareFunctionsProcessor> T 
createFunctionsProcessorInstanceInternal(FunctionCall call, T evaluator) {
+    LogicalExpression nameArg = call.arg(0);
+    LogicalExpression valueArg = call.argCount() >= 2 ? call.arg(1) : null;
+    if (valueArg != null) { // binary function
+      if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
+        LogicalExpression swapArg = valueArg;
+        valueArg = nameArg;
+        nameArg = swapArg;
+        
evaluator.setFunctionName(COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(evaluator.getFunctionName()));
+      }
+      evaluator.setSuccess(nameArg.accept(evaluator, valueArg));
+    } else if (call.arg(0) instanceof SchemaPath) {
+      evaluator.setPath((SchemaPath) nameArg);
+    }
+    evaluator.setSuccess(true);
+    return evaluator;
+  }
+
+  public boolean isSuccess() {
+    return success;
+  }
+
+  protected void setSuccess(boolean success) {
+    this.success = success;
+  }
+
+  public SchemaPath getPath() {
+    return path;
+  }
+
+  protected void setPath(SchemaPath path) {
+    this.path = path;
+  }
+
+  public String getFunctionName() {
+    return functionName;
+  }
+
+  protected void setFunctionName(String functionName) {
+    this.functionName = functionName;
+  }
+
+  public Object getValue() {
+    return value;
+  }
+
+  public void setValue(Object value) {
+    this.value = value;
+  }
+
+  public PredicateLeaf.Type getValueType() {
+    return valueType;
+  }
+
+  public void setValueType(PredicateLeaf.Type valueType) {
+    this.valueType = valueType;
+  }
+
+  @Override
+  public Boolean visitCastExpression(CastExpression e, LogicalExpression 
valueArg) throws RuntimeException {
+    if (e.getInput() instanceof CastExpression || e.getInput() instanceof 
SchemaPath) {
+      return e.getInput().accept(this, valueArg);
+    }
+    return false;
+  }
+
+  @Override
+  public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) 
throws RuntimeException {
+    return false;
+  }
+
+  @Override
+  public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) 
throws RuntimeException {
+    if (valueArg instanceof QuotedString) {
+      this.value = ((QuotedString) valueArg).getString();
+      this.path = path;
+      this.valueType = PredicateLeaf.Type.STRING;
+      return true;
+    }
+
+    if (valueArg instanceof IntExpression) {
+      int expValue = ((IntExpression) valueArg).getInt();
+      this.value = ((Integer) expValue).longValue();
+      this.path = path;
+      this.valueType = PredicateLeaf.Type.LONG;
+      return true;
+    }
+
+    if (valueArg instanceof LongExpression) {
+      this.value = ((LongExpression) valueArg).getLong();
+      this.path = path;
+      this.valueType = PredicateLeaf.Type.LONG;
+      return true;
+    }
+
+    if (valueArg instanceof FloatExpression) {
+      this.value = ((FloatExpression) valueArg).getFloat();
+      this.path = path;
+      this.valueType = PredicateLeaf.Type.FLOAT;
+      return true;
+    }
+
+    if (valueArg instanceof DoubleExpression) {
+      this.value = ((DoubleExpression) valueArg).getDouble();
+      this.path = path;
+      this.valueType = PredicateLeaf.Type.FLOAT;
+      return true;
+    }
+
+    if (valueArg instanceof BooleanExpression) {
+      this.value = ((BooleanExpression) valueArg).getBoolean();
+      this.path = path;
+      this.valueType = PredicateLeaf.Type.BOOLEAN;
+      return true;
+    }
+
+    if (valueArg instanceof DateExpression) {
+      this.value = ((DateExpression) valueArg).getDate();
+      this.path = path;
+      this.valueType = PredicateLeaf.Type.LONG;
+      return true;
+    }
+
+    if (valueArg instanceof TimeStampExpression) {
+      long timeStamp = ((TimeStampExpression) valueArg).getTimeStamp();
+      this.value = new Timestamp(timeStamp);
+      this.path = path;
+      this.valueType = PredicateLeaf.Type.TIMESTAMP;
+      return true;
+    }
+
+    if (valueArg instanceof ValueExpressions.VarDecimalExpression) {
+      double v = ((ValueExpressions.VarDecimalExpression) 
valueArg).getBigDecimal().doubleValue();
+      this.value = new HiveDecimalWritable(String.valueOf(v));
+      this.path = path;
+      this.valueType = PredicateLeaf.Type.DECIMAL;
+      return true;
+    }
+    return false;
+  }
+}
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilter.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilter.java
new file mode 100644
index 0000000000..46d973696b
--- /dev/null
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.filter;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl;
+import org.apache.hive.com.esotericsoftware.kryo.Kryo;
+import org.apache.hive.com.esotericsoftware.kryo.io.Input;
+import org.apache.hive.com.esotericsoftware.kryo.io.Output;
+/**
+ * Primary interface for <a href="http://en.wikipedia.org/wiki/Sargable";>
+ *   SearchArgument</a>, which are the subset of predicates
+ * that can be pushed down to the RecordReader. Each SearchArgument consists
+ * of a series of SearchClauses that must each be true for the row to be
+ * accepted by the filter.
+ *
+ * This requires that the filter be normalized into conjunctive normal form
+ * (<a href="http://en.wikipedia.org/wiki/Conjunctive_normal_form";>CNF</a>).
+ */
+public class HiveFilter {
+
+  private final SearchArgument searchArgument;
+  private final static ThreadLocal<Kryo> kryo = new ThreadLocal<Kryo>() {
+     protected Kryo initialValue() { return new Kryo(); }
+  };
+
+  public HiveFilter(SearchArgument searchArgument) {
+    this.searchArgument = searchArgument;
+  }
+
+  private static String toKryo(SearchArgument sarg) {
+    Output out = new Output(4 * 1024, 10 * 1024 * 1024);
+    new Kryo().writeObject(out, sarg);
+    out.close();
+    return Base64.encodeBase64String(out.toBytes());
+  }
+
+  @VisibleForTesting
+  public static SearchArgument create(String kryo) {
+    return create(Base64.decodeBase64(kryo));
+  }
+
+  private static SearchArgument create(byte[] kryoBytes) {
+    return kryo.get().readObject(new Input(kryoBytes), 
SearchArgumentImpl.class);
+  }
+
+  public SearchArgument getSearchArgument() {
+    return searchArgument;
+  }
+
+  public String getSearchArgumentString() {
+    return toKryo(searchArgument);
+  }
+}
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilterBuilder.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilterBuilder.java
new file mode 100644
index 0000000000..92c23b67cc
--- /dev/null
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilterBuilder.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.filter;
+
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class HiveFilterBuilder extends 
AbstractExprVisitor<SearchArgument.Builder, Void, RuntimeException> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(HiveFilterBuilder.class);
+
+  private final LogicalExpression le;
+  private final SearchArgument.Builder builder;
+  private final HashMap<String, SqlTypeName> dataTypeMap;
+
+  private boolean allExpressionsConverted = false;
+
+  HiveFilterBuilder(final LogicalExpression le, final HashMap<String, 
SqlTypeName> dataTypeMap) {
+    this.le = le;
+    this.dataTypeMap = dataTypeMap;
+    this.builder = SearchArgumentFactory.newBuilder();
+  }
+
+  public HiveFilter parseTree() {
+    SearchArgument.Builder accept = le.accept(this, null);
+    SearchArgument searchArgument = builder.build();
+    if (accept != null) {
+      searchArgument = accept.build();
+    }
+    return new HiveFilter(searchArgument);
+  }
+
+  public boolean isAllExpressionsConverted() {
+    return allExpressionsConverted;
+  }
+
+  @Override
+  public SearchArgument.Builder visitUnknown(LogicalExpression e, Void value) 
throws RuntimeException {
+    allExpressionsConverted = false;
+    return null;
+  }
+
+  @Override
+  public SearchArgument.Builder visitSchemaPath(SchemaPath path, Void value) 
throws RuntimeException {
+    if (path instanceof FieldReference) {
+      String fieldName = path.getAsNamePart().getName();
+      SqlTypeName sqlTypeName = dataTypeMap.get(fieldName);
+      switch (sqlTypeName) {
+        case BOOLEAN:
+          PredicateLeaf.Type valueType = convertLeafType(sqlTypeName);
+          builder.startNot().equals(fieldName, valueType, false).end();
+          break;
+        default:
+          // otherwise, we don't know what to do so make it a maybe or do 
nothing
+          builder.literal(SearchArgument.TruthValue.YES_NO_NULL);
+      }
+    }
+    return builder;
+  }
+
+  @Override
+  public SearchArgument.Builder visitBooleanOperator(BooleanOperator op, Void 
value) throws RuntimeException {
+    return visitFunctionCall(op, value);
+  }
+
+  @Override
+  public SearchArgument.Builder visitFunctionCall(FunctionCall call, Void 
value) throws RuntimeException {
+    String functionName = call.getName();
+    List<LogicalExpression> args = call.args();
+    if (HiveCompareFunctionsProcessor.isCompareFunction(functionName) || 
HiveCompareFunctionsProcessor.isIsFunction(functionName) || 
HiveCompareFunctionsProcessor.isNot(call, functionName)) {
+      HiveCompareFunctionsProcessor processor = 
HiveCompareFunctionsProcessor.createFunctionsProcessorInstance(call);
+      if (processor.isSuccess()) {
+        return buildSearchArgument(processor);
+      }
+    } else {
+      switch (functionName) {
+        case FunctionNames.AND:
+          builder.startAnd();
+          break;
+        case FunctionNames.OR:
+          builder.startOr();
+          break;
+        default:
+          logger.warn("Unsupported logical operator:{} for push down", 
functionName);
+          return builder;
+      }
+      for (LogicalExpression arg : args) {
+        arg.accept(this, null);
+      }
+      builder.end();
+    }
+    return builder;
+  }
+
+  private SearchArgument.Builder buildSearchArgument(final 
HiveCompareFunctionsProcessor processor) {
+    String functionName = processor.getFunctionName();
+    SchemaPath field = processor.getPath();
+    Object fieldValue = processor.getValue();
+    PredicateLeaf.Type valueType = processor.getValueType();
+    SqlTypeName sqlTypeName = dataTypeMap.get(field.getAsNamePart().getName());
+    if (fieldValue == null) {
+      valueType = convertLeafType(sqlTypeName);
+    }
+    if (valueType == null) {
+      return builder;
+    }
+    switch (functionName) {
+      case FunctionNames.EQ:
+        builder.startAnd().equals(field.getAsNamePart().getName(), valueType, 
fieldValue).end();
+        break;
+      case FunctionNames.NE:
+        builder.startNot().equals(field.getAsNamePart().getName(), valueType, 
fieldValue).end();
+        break;
+      case FunctionNames.GE:
+        builder.startNot().lessThan(field.getAsNamePart().getName(), 
valueType, fieldValue).end();
+        break;
+      case FunctionNames.GT:
+        builder.startNot().lessThanEquals(field.getAsNamePart().getName(), 
valueType, fieldValue).end();
+        break;
+      case FunctionNames.LE:
+        builder.startAnd().lessThanEquals(field.getAsNamePart().getName(), 
valueType, fieldValue).end();
+        break;
+      case FunctionNames.LT:
+        builder.startAnd().lessThan(field.getAsNamePart().getName(), 
valueType, fieldValue).end();
+        break;
+      case FunctionNames.IS_NULL:
+      case "isNull":
+      case "is null":
+        builder.startAnd().isNull(field.getAsNamePart().getName(), 
valueType).end();
+        break;
+      case FunctionNames.IS_NOT_NULL:
+      case "isNotNull":
+      case "is not null":
+        builder.startNot().isNull(field.getAsNamePart().getName(), 
valueType).end();
+        break;
+      case FunctionNames.IS_NOT_TRUE:
+      case FunctionNames.IS_FALSE:
+      case FunctionNames.NOT:
+        builder.startNot().equals(field.getAsNamePart().getName(), valueType, 
true).end();
+        break;
+      case FunctionNames.IS_TRUE:
+      case FunctionNames.IS_NOT_FALSE:
+        builder.startNot().equals(field.getAsNamePart().getName(), valueType, 
false).end();
+        break;
+    }
+    return builder;
+  }
+
+  private PredicateLeaf.Type convertLeafType(SqlTypeName sqlTypeName) {
+    PredicateLeaf.Type type = null;
+    switch (sqlTypeName) {
+      case BOOLEAN:
+        type = PredicateLeaf.Type.BOOLEAN;
+        break;
+      case TINYINT:
+      case SMALLINT:
+      case INTEGER:
+      case BIGINT:
+        type = PredicateLeaf.Type.LONG;
+        break;
+      case DOUBLE:
+      case FLOAT:
+        type = PredicateLeaf.Type.FLOAT;
+        break;
+      case DECIMAL:
+        type = PredicateLeaf.Type.DECIMAL;
+        break;
+      case DATE:
+        type = PredicateLeaf.Type.DATE;
+        break;
+      case TIMESTAMP:
+        type = PredicateLeaf.Type.TIMESTAMP;
+        break;
+      case CHAR:
+      case VARCHAR:
+        type = PredicateLeaf.Type.STRING;
+        break;
+      default:
+        logger.warn("Not support push down type:" + sqlTypeName.getName());
+    }
+    return type;
+  }
+}
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HivePushFilterIntoScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HivePushFilterIntoScan.java
new file mode 100644
index 0000000000..f372521ec7
--- /dev/null
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HivePushFilterIntoScan.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.filter;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.hive.HiveScan;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg.SARG_PUSHDOWN;
+
+public abstract class HivePushFilterIntoScan extends 
StoragePluginOptimizerRule {
+
+  private HivePushFilterIntoScan(RelOptRuleOperand operand, String 
description) {
+    super(operand, description);
+  }
+
+  public static final StoragePluginOptimizerRule FILTER_ON_SCAN =
+      new HivePushFilterIntoScan(RelOptHelper.some(FilterPrel.class,
+          RelOptHelper.any(ScanPrel.class)), 
"HivePushFilterIntoScan:Filter_On_Scan") {
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      final ScanPrel scan = call.rel(1);
+      final FilterPrel filter = call.rel(0);
+      final RexNode condition = filter.getCondition();
+
+      HiveScan groupScan = (HiveScan) scan.getGroupScan();
+      if (groupScan.isFilterPushedDown()) {
+        /*
+         * The rule can get triggered again due to the transformed "scan => 
filter" sequence
+         * created by the earlier execution of this rule when we could not do 
a complete
+         * conversion of Optiq Filter's condition to Hive Filter. In such 
cases, we rely upon
+         * this flag to not do a re-processing of the rule on the already 
transformed call.
+         */
+        return;
+      }
+
+      doPushFilterToScan(call, filter, null, scan, groupScan, condition);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      final ScanPrel scan = call.rel(1);
+      if (scan.getGroupScan() instanceof HiveScan) {
+        return super.matches(call);
+      }
+      return false;
+    }
+  };
+
+  public static final StoragePluginOptimizerRule FILTER_ON_PROJECT =
+      new HivePushFilterIntoScan(RelOptHelper.some(FilterPrel.class,
+          RelOptHelper.some(ProjectPrel.class, 
RelOptHelper.any(ScanPrel.class))),
+          "HivePushFilterIntoScan:Filter_On_Project") {
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      final ScanPrel scan = call.rel(2);
+      final ProjectPrel project = call.rel(1);
+      final FilterPrel filter = call.rel(0);
+
+      HiveScan groupScan = (HiveScan) scan.getGroupScan();
+      if (groupScan.isFilterPushedDown()) {
+        /*
+         * The rule can get triggered again due to the transformed "scan => 
filter" sequence
+         * created by the earlier execution of this rule when we could not do 
a complete
+         * conversion of Optiq Filter's condition to Hive Filter. In such 
cases, we rely upon
+         * this flag to not do a re-processing of the rule on the already 
transformed call.
+         */
+        return;
+      }
+
+      // convert the filter to one that references the child of the project
+      final RexNode condition = 
RelOptUtil.pushPastProject(filter.getCondition(), project);
+
+      doPushFilterToScan(call, filter, project, scan, groupScan, condition);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      final ScanPrel scan = call.rel(2);
+      if (scan.getGroupScan() instanceof HiveScan) {
+        return super.matches(call);
+      }
+      return false;
+    }
+  };
+
+
+  protected void doPushFilterToScan(final RelOptRuleCall call, final 
FilterPrel filter,
+      final ProjectPrel project, final ScanPrel scan, final HiveScan groupScan,
+      final RexNode condition) {
+
+    HashMap<String, SqlTypeName> dataTypeMap = new HashMap<>();
+    List<RelDataTypeField> fieldList = scan.getRowType().getFieldList();
+    for (RelDataTypeField relDataTypeField : fieldList) {
+      String name = relDataTypeField.getName();
+      SqlTypeName sqlTypeName = relDataTypeField.getValue().getSqlTypeName();
+      dataTypeMap.put(name, sqlTypeName);
+    }
+
+    final LogicalExpression conditionExp = DrillOptiq.toDrill(new 
DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())),
+            scan, condition);
+    final HiveFilterBuilder orcFilterBuilder = new 
HiveFilterBuilder(conditionExp, dataTypeMap);
+    final HiveFilter newScanSpec = orcFilterBuilder.parseTree();
+
+    if (newScanSpec == null) {
+      return; //no filter pushdown ==> No transformation.
+    }
+
+    String searchArgument = newScanSpec.getSearchArgumentString();
+    Map<String, String> confProperties = groupScan.getConfProperties();
+    confProperties.put(SARG_PUSHDOWN, searchArgument);
+    HiveScan newGroupsScan = null;
+    try {
+      newGroupsScan = new HiveScan(groupScan.getUserName(), 
groupScan.getHiveReadEntry(),
+          groupScan.getStoragePlugin(), groupScan.getColumns(), null, 
confProperties, groupScan.getMaxRecords());
+    } catch (ExecutionSetupException e) {
+      //Why does constructor method HiveScan throw exception?
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+    newGroupsScan.setFilterPushedDown(true);
+
+    final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), 
filter.getTraitSet(),
+        newGroupsScan, scan.getRowType(), scan.getTable());
+    // Depending on whether is a project in the middle, assign either scan or 
copy of project to
+    // childRel.
+    final RelNode childRel = project == null ? newScanPrel : 
project.copy(project.getTraitSet(),
+        ImmutableList.of(newScanPrel));
+    /*
+     * we could not convert the entire filter condition expression into an 
Hive orc filter.
+     */
+    call.transformTo(filter.copy(filter.getTraitSet(), 
ImmutableList.of(childRel)));
+  }
+}
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveFilterPushDown.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveFilterPushDown.java
new file mode 100644
index 0000000000..b821ae5a3b
--- /dev/null
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveFilterPushDown.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec;
+
+import org.apache.drill.categories.HiveStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.exec.hive.HiveTestBase;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({SlowTest.class, HiveStorageTest.class})
+public class TestHiveFilterPushDown extends HiveTestBase {
+
+    @BeforeClass
+    public static void init() {
+        //set false for test parquet push down
+        
setSessionOption(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER, 
false);
+        setSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
+    }
+
+    @AfterClass
+    public static void cleanup() {
+        
resetSessionOption(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER);
+        resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
+    }
+
+    @Test
+    public void testPushDownWithEqualForOrc() throws Exception {
+        String query = "select * from hive.orc_push_down where key = 1";
+
+        int actualRowCount = testSql(query);
+        assertEquals("Expected and actual row count should match", 4, 
actualRowCount);
+
+        testPlanMatchingPatterns(query,
+                new String[]{"Filter\\(condition=\\[=\\($0, 1\\)\\]\\)",
+                        "SearchArgument=leaf-0 = \\(EQUALS key 1\\), expr = 
leaf-0"},
+                new String[]{});
+    }
+
+    @Test
+    public void testPushDownWithNotEqualForOrc() throws Exception {
+        String query = "select * from hive.orc_push_down where key <> 1";
+
+        int actualRowCount = testSql(query);
+        assertEquals("Expected and actual row count should match", 2, 
actualRowCount);
+
+        testPlanMatchingPatterns(query,
+                new String[]{"Filter\\(condition=\\[=\\<\\>\\($0, 1\\)\\]\\)",
+                        "SearchArgument=leaf-0 = \\(EQUALS key 1\\), expr = 
\\(not leaf-0\\)"},
+                new String[]{});
+    }
+
+    @Test
+    public void testPushDownWithGreaterThanForOrc() throws Exception {
+        String query = "select * from hive.orc_push_down where key > 1";
+
+        int actualRowCount = testSql(query);
+        assertEquals("Expected and actual row count should match", 2, 
actualRowCount);
+
+        testPlanMatchingPatterns(query,
+                new String[]{"Filter\\(condition=\\[=\\>\\($0, 1\\)\\]\\)",
+                        "SearchArgument=leaf-0 = \\(LESS_THAN_EQUALS key 1\\), 
expr = \\(not leaf-0\\)"},
+                new String[]{});
+    }
+
+    @Test
+    public void testPushDownWithLessThanForOrc() throws Exception {
+        String query = "select * from hive.orc_push_down where key < 2";
+
+        int actualRowCount = testSql(query);
+        assertEquals("Expected and actual row count should match", 4, 
actualRowCount);
+
+        testPlanMatchingPatterns(query,
+                new String[]{"Filter\\(condition=\\[=\\<\\($0, 2\\)\\]\\)",
+                        "SearchArgument=leaf-0 = \\(LESS_THAN key 2\\), expr = 
leaf-0"},
+                new String[]{});
+    }
+
+    @Test
+    public void testPushDownWithAndForOrc() throws Exception {
+        String query = "select * from hive.orc_push_down where key = 2 and 
var_key = 'var_6'";
+
+        int actualRowCount = testSql(query);
+        assertEquals("Expected and actual row count should match", 1, 
actualRowCount);
+
+        testPlanMatchingPatterns(query,
+                new String[]{"Filter\\(condition=\\[AND\\(=\\($0, 2\\), 
=\\($2, 'var_6'\\)\\)\\]\\)",
+                        "SearchArgument=leaf-0 = \\(EQUALS key 2\\), leaf-1 = 
\\(EQUALS var_key var_6\\), expr = \\(and leaf-0 leaf-1\\)"},
+                new String[]{});
+    }
+
+    @Test
+    public void testPushDownWithOrForOrc() throws Exception {
+        String query = "select * from hive.orc_push_down where key = 2 and 
var_key = 'var_1'";
+
+        int actualRowCount = testSql(query);
+        assertEquals("Expected and actual row count should match", 3, 
actualRowCount);
+
+        testPlanMatchingPatterns(query,
+                new String[]{"Filter\\(condition=\\[OR\\(=\\($0, 2\\), =\\($2, 
'var_1'\\)\\)\\]\\)",
+                        "SearchArgument=leaf-0 = \\(EQUALS key 2\\), leaf-1 = 
\\(EQUALS var_key var_1\\), expr = \\(or leaf-0 leaf-1\\)"},
+                new String[]{});
+    }
+}
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 106fb22963..0a8b7bc3d9 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -454,8 +454,21 @@ public class HiveTestDataGenerator {
         "INNER JOIN kv ON kv.key = dk.key");
 
     createTableWithEmptyParquet(hiveDriver);
+    createTestDataForDrillORCPushDownReaderTests(hiveDriver);
   }
 
+  private void createTestDataForDrillORCPushDownReaderTests(Driver hiveDriver) 
{
+    // Hive managed table that has data qualified for Drill orc filter push 
down
+    executeQuery(hiveDriver, "create table orc_push_down(key int, int_key int, 
var_key varchar(10), dec_key decimal(5, 2), boolean_key boolean) stored as 
orc");
+    // each insert is created in separate file
+    executeQuery(hiveDriver, "insert into table orc_push_down values (1, 1, 
'var_1', 1.11,true), (1, 2, 'var_2', 2.22,false)");
+    executeQuery(hiveDriver, "insert into table orc_push_down values (1, 3, 
'var_3', 3.33,true), (1, 4, 'var_4', 4.44,true)");
+    executeQuery(hiveDriver, "insert into table orc_push_down values (2, 5, 
'var_5', 5.55,false), (2, 6, 'var_6', 6.66,true)");
+    executeQuery(hiveDriver, "insert into table orc_push_down values (null, 7, 
'var_7', 7.77,false), (null, 8, 'var_8', 8.88,false)");
+  }
+
+
+
   private void createTestDataForDrillNativeParquetReaderTests(Driver 
hiveDriver) {
     // Hive managed table that has data qualified for Drill native filter push 
down
     executeQuery(hiveDriver, "create table kv_native(key int, int_key int, 
var_key varchar(10), dec_key decimal(5, 2)) stored as parquet");


Reply via email to