This is an automated email from the ASF dual-hosted git repository.

rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d6326f06c8 [GLUTEN-6666][VL] Use custom 
SparkExprToSubfieldFilterParser (#6754)
d6326f06c8 is described below

commit d6326f06c8158de2eb08bb9ba7e3e4eb6e3964ef
Author: Rui Mo <[email protected]>
AuthorDate: Wed Nov 13 23:39:08 2024 +0800

    [GLUTEN-6666][VL] Use custom SparkExprToSubfieldFilterParser (#6754)
    
    Removes separating filter relevant code from Gluten. With a custom filter 
parser registered, we are able to use Velox provided filter extraction.
---
 cpp/velox/CMakeLists.txt                           |    1 +
 cpp/velox/compute/VeloxBackend.cc                  |    3 +
 .../functions/SparkExprToSubfieldFilterParser.cc   |  103 ++
 .../functions/SparkExprToSubfieldFilterParser.h    |   37 +
 cpp/velox/substrait/SubstraitToVeloxPlan.cc        | 1174 +-------------------
 cpp/velox/substrait/SubstraitToVeloxPlan.h         |  341 ------
 .../tests/Substrait2VeloxPlanConversionTest.cc     |   10 +-
 ep/build-velox/src/get_velox.sh                    |    2 +-
 8 files changed, 151 insertions(+), 1520 deletions(-)

diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 586c08653d..329da49497 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -168,6 +168,7 @@ set(VELOX_SRCS
     memory/VeloxMemoryManager.cc
     operators/functions/RegistrationAllFunctions.cc
     operators/functions/RowConstructorWithNull.cc
+    operators/functions/SparkExprToSubfieldFilterParser.cc
     operators/serializer/VeloxColumnarToRowConverter.cc
     operators/serializer/VeloxColumnarBatchSerializer.cc
     operators/serializer/VeloxRowToColumnarConverter.cc
diff --git a/cpp/velox/compute/VeloxBackend.cc 
b/cpp/velox/compute/VeloxBackend.cc
index c1e907bea1..889f979b28 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -33,6 +33,7 @@
 #include "compute/VeloxRuntime.h"
 #include "config/VeloxConfig.h"
 #include "jni/JniFileSystem.h"
+#include "operators/functions/SparkExprToSubfieldFilterParser.h"
 #include "udf/UdfLoader.h"
 #include "utils/Exception.h"
 #include "velox/common/caching/SsdCache.h"
@@ -155,6 +156,8 @@ void VeloxBackend::init(const 
std::unordered_map<std::string, std::string>& conf
   velox::parquet::registerParquetReaderFactory();
   velox::parquet::registerParquetWriterFactory();
   velox::orc::registerOrcReaderFactory();
+  velox::exec::ExprToSubfieldFilterParser::registerParserFactory(
+      []() { return std::make_shared<SparkExprToSubfieldFilterParser>(); });
 
   // Register Velox functions
   registerAllFunctions();
diff --git a/cpp/velox/operators/functions/SparkExprToSubfieldFilterParser.cc 
b/cpp/velox/operators/functions/SparkExprToSubfieldFilterParser.cc
new file mode 100644
index 0000000000..8ad537d816
--- /dev/null
+++ b/cpp/velox/operators/functions/SparkExprToSubfieldFilterParser.cc
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "operators/functions/SparkExprToSubfieldFilterParser.h"
+
+namespace gluten {
+
+using namespace facebook::velox;
+
+bool SparkExprToSubfieldFilterParser::toSparkSubfield(const core::ITypedExpr* 
field, common::Subfield& subfield) {
+  std::vector<std::unique_ptr<common::Subfield::PathElement>> path;
+  for (auto* current = field;;) {
+    if (auto* fieldAccess = dynamic_cast<const 
core::FieldAccessTypedExpr*>(current)) {
+      
path.push_back(std::make_unique<common::Subfield::NestedField>(fieldAccess->name()));
+    } else if (dynamic_cast<const core::DereferenceTypedExpr*>(current)) {
+      return false;
+    } else if (dynamic_cast<const core::InputTypedExpr*>(current) == nullptr) {
+      return false;
+    } else {
+      break;
+    }
+
+    if (!current->inputs().empty()) {
+      return false;
+    } else {
+      break;
+    }
+  }
+  std::reverse(path.begin(), path.end());
+  subfield = common::Subfield(std::move(path));
+  return true;
+}
+
+std::unique_ptr<common::Filter> 
SparkExprToSubfieldFilterParser::leafCallToSubfieldFilter(
+    const core::CallTypedExpr& call,
+    common::Subfield& subfield,
+    core::ExpressionEvaluator* evaluator,
+    bool negated) {
+  if (call.inputs().empty()) {
+    return nullptr;
+  }
+
+  const auto* leftSide = call.inputs()[0].get();
+
+  if (call.name() == "equalto") {
+    if (toSparkSubfield(leftSide, subfield)) {
+      return negated ? makeNotEqualFilter(call.inputs()[1], evaluator) : 
makeEqualFilter(call.inputs()[1], evaluator);
+    }
+  } else if (call.name() == "lessthanorequal") {
+    if (toSparkSubfield(leftSide, subfield)) {
+      return negated ? makeGreaterThanFilter(call.inputs()[1], evaluator)
+                     : makeLessThanOrEqualFilter(call.inputs()[1], evaluator);
+    }
+  } else if (call.name() == "lessthan") {
+    if (toSparkSubfield(leftSide, subfield)) {
+      return negated ? makeGreaterThanOrEqualFilter(call.inputs()[1], 
evaluator)
+                     : makeLessThanFilter(call.inputs()[1], evaluator);
+    }
+  } else if (call.name() == "greaterthanorequal") {
+    if (toSparkSubfield(leftSide, subfield)) {
+      return negated ? makeLessThanFilter(call.inputs()[1], evaluator)
+                     : makeGreaterThanOrEqualFilter(call.inputs()[1], 
evaluator);
+    }
+  } else if (call.name() == "greaterthan") {
+    if (toSparkSubfield(leftSide, subfield)) {
+      return negated ? makeLessThanOrEqualFilter(call.inputs()[1], evaluator)
+                     : makeGreaterThanFilter(call.inputs()[1], evaluator);
+    }
+  } else if (call.name() == "in") {
+    if (toSparkSubfield(leftSide, subfield)) {
+      return makeInFilter(call.inputs()[1], evaluator, negated);
+    }
+  } else if (call.name() == "isnull") {
+    if (toSparkSubfield(leftSide, subfield)) {
+      if (negated) {
+        return exec::isNotNull();
+      }
+      return exec::isNull();
+    }
+  } else if (call.name() == "isnotnull") {
+    if (toSparkSubfield(leftSide, subfield)) {
+      if (negated) {
+        return exec::isNull();
+      }
+      return exec::isNotNull();
+    }
+  }
+  return nullptr;
+}
+} // namespace gluten
diff --git a/cpp/velox/operators/functions/SparkExprToSubfieldFilterParser.h 
b/cpp/velox/operators/functions/SparkExprToSubfieldFilterParser.h
new file mode 100644
index 0000000000..d050091c07
--- /dev/null
+++ b/cpp/velox/operators/functions/SparkExprToSubfieldFilterParser.h
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "velox/expression/ExprToSubfieldFilter.h"
+
+namespace gluten {
+
+/// Parses Spark expression into subfield filter. Differences from Presto's 
parser include:
+/// 1) Some Spark functions are registered under different names.
+/// 2) The supported functions vary.
+/// 3) Filter push-down on nested fields is disabled.
+class SparkExprToSubfieldFilterParser : public 
facebook::velox::exec::ExprToSubfieldFilterParser {
+ public:
+  std::unique_ptr<facebook::velox::common::Filter> leafCallToSubfieldFilter(
+      const facebook::velox::core::CallTypedExpr& call,
+      facebook::velox::common::Subfield& subfield,
+      facebook::velox::core::ExpressionEvaluator* evaluator,
+      bool negated) override;
+
+ private:
+  // Compared to the upstream 'toSubfield', the push-down of filter on nested 
field is disabled.
+  bool toSparkSubfield(const facebook::velox::core::ITypedExpr* field, 
facebook::velox::common::Subfield& subfield);
+};
+} // namespace gluten
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 9e29590433..cdd9269e14 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -73,45 +73,6 @@ EmitInfo getEmitInfo(const ::substrait::RelCommon& 
relCommon, const core::PlanNo
   return emitInfo;
 }
 
-template <typename T>
-// Get the lowest value for numeric type.
-T getLowest() {
-  return std::numeric_limits<T>::lowest();
-}
-
-// Get the lowest value for string.
-template <>
-std::string getLowest<std::string>() {
-  return "";
-}
-
-// Get the max value for numeric type.
-template <typename T>
-T getMax() {
-  return std::numeric_limits<T>::max();
-}
-
-// The max value will be used in BytesRange. Return empty string here instead.
-template <>
-std::string getMax<std::string>() {
-  return "";
-}
-
-// Substrait function names.
-const std::string sIsNotNull = "is_not_null";
-const std::string sIsNull = "is_null";
-const std::string sGte = "gte";
-const std::string sGt = "gt";
-const std::string sLte = "lte";
-const std::string sLt = "lt";
-const std::string sEqual = "equal";
-const std::string sOr = "or";
-const std::string sNot = "not";
-
-// Substrait types.
-const std::string sI32 = "i32";
-const std::string sI64 = "i64";
-
 /// @brief Get the input type from both sides of join.
 /// @param leftNode the plan node of left side.
 /// @param rightNode the plan node of right side.
@@ -1190,37 +1151,10 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
     tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
         kHiveConnectorId, "hive_table", filterPushdownEnabled, 
connector::hive::SubfieldFilters{}, nullptr);
   } else {
-    // Flatten the conditions connected with 'and'.
-    std::vector<::substrait::Expression_ScalarFunction> scalarFunctions;
-    std::vector<::substrait::Expression_SingularOrList> singularOrLists;
-    std::vector<::substrait::Expression_IfThen> ifThens;
-    flattenConditions(readRel.filter(), scalarFunctions, singularOrLists, 
ifThens);
-
-    // The vector's subscript stands for the column index.
-    std::vector<RangeRecorder> rangeRecorders(veloxTypeList.size());
-
-    // Separate the filters to be two parts. The subfield part can be
-    // pushed down.
-    std::vector<::substrait::Expression_ScalarFunction> subfieldFunctions;
-    std::vector<::substrait::Expression_ScalarFunction> remainingFunctions;
-    std::vector<::substrait::Expression_SingularOrList> subfieldOrLists;
-    std::vector<::substrait::Expression_SingularOrList> remainingOrLists;
-
-    separateFilters(
-        rangeRecorders,
-        scalarFunctions,
-        subfieldFunctions,
-        remainingFunctions,
-        singularOrLists,
-        subfieldOrLists,
-        remainingOrLists,
-        veloxTypeList,
-        splitInfo->format);
-
-    // Create subfield filters based on the constructed filter info map.
-    auto subfieldFilters = createSubfieldFilters(colNameList, veloxTypeList, 
subfieldFunctions, subfieldOrLists);
-    // Connect the remaining filters with 'and'.
-    auto remainingFilter = connectWithAnd(colNameList, veloxTypeList, 
remainingFunctions, remainingOrLists, ifThens);
+    connector::hive::SubfieldFilters subfieldFilters;
+    auto names = colNameList;
+    auto types = veloxTypeList;
+    auto remainingFilter = exprConverter_->toVeloxExpr(readRel.filter(), 
ROW(std::move(names), std::move(types)));
 
     tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
         kHiveConnectorId, "hive_table", filterPushdownEnabled, 
std::move(subfieldFilters), remainingFilter);
@@ -1386,39 +1320,6 @@ void 
SubstraitToVeloxPlanConverter::constructFunctionMap(const ::substrait::Plan
   exprConverter_ = std::make_unique<SubstraitVeloxExprConverter>(pool_, 
functionMap_);
 }
 
-void SubstraitToVeloxPlanConverter::flattenConditions(
-    const ::substrait::Expression& substraitFilter,
-    std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions,
-    std::vector<::substrait::Expression_SingularOrList>& singularOrLists,
-    std::vector<::substrait::Expression_IfThen>& ifThens) {
-  auto typeCase = substraitFilter.rex_type_case();
-  switch (typeCase) {
-    case ::substrait::Expression::RexTypeCase::kScalarFunction: {
-      const auto& sFunc = substraitFilter.scalar_function();
-      auto filterNameSpec = SubstraitParser::findFunctionSpec(functionMap_, 
sFunc.function_reference());
-      // TODO: Only and relation is supported here.
-      if (SubstraitParser::getNameBeforeDelimiter(filterNameSpec) == "and") {
-        for (const auto& sCondition : sFunc.arguments()) {
-          flattenConditions(sCondition.value(), scalarFunctions, 
singularOrLists, ifThens);
-        }
-      } else {
-        scalarFunctions.emplace_back(sFunc);
-      }
-      break;
-    }
-    case ::substrait::Expression::RexTypeCase::kSingularOrList: {
-      singularOrLists.emplace_back(substraitFilter.singular_or_list());
-      break;
-    }
-    case ::substrait::Expression::RexTypeCase::kIfThen: {
-      ifThens.emplace_back(substraitFilter.if_then());
-      break;
-    }
-    default:
-      VELOX_NYI("GetFlatConditions not supported for type '{}'", 
std::to_string(typeCase));
-  }
-}
-
 std::string SubstraitToVeloxPlanConverter::findFuncSpec(uint64_t id) {
   return SubstraitParser::findFunctionSpec(functionMap_, id);
 }
@@ -1481,878 +1382,6 @@ void SubstraitToVeloxPlanConverter::extractJoinKeys(
   }
 }
 
-connector::hive::SubfieldFilters 
SubstraitToVeloxPlanConverter::createSubfieldFilters(
-    const std::vector<std::string>& inputNameList,
-    const std::vector<TypePtr>& inputTypeList,
-    const std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions,
-    const std::vector<::substrait::Expression_SingularOrList>& 
singularOrLists) {
-  // The vector's subscript stands for the column index.
-  std::vector<FilterInfo> columnToFilterInfo(inputTypeList.size());
-
-  // Process scalarFunctions.
-  for (const auto& scalarFunction : scalarFunctions) {
-    auto filterNameSpec = SubstraitParser::findFunctionSpec(functionMap_, 
scalarFunction.function_reference());
-    auto filterName = SubstraitParser::getNameBeforeDelimiter(filterNameSpec);
-
-    if (filterName == sNot) {
-      VELOX_CHECK(scalarFunction.arguments().size() == 1);
-      auto expr = scalarFunction.arguments()[0].value();
-      if (expr.has_scalar_function()) {
-        // Set its child to filter info with reverse enabled.
-        setFilterInfo(expr.scalar_function(), inputTypeList, 
columnToFilterInfo, true);
-      } else if (expr.has_singular_or_list()) {
-        auto singularOrList = expr.singular_or_list();
-        setFilterInfo(singularOrList, columnToFilterInfo, true);
-      } else {
-        VELOX_NYI("Only support push down Not with scalar function or In.");
-      }
-    } else if (filterName == sOr) {
-      VELOX_CHECK(scalarFunction.arguments().size() == 2);
-      VELOX_CHECK(std::all_of(
-          scalarFunction.arguments().cbegin(),
-          scalarFunction.arguments().cend(),
-          [](const ::substrait::FunctionArgument& arg) {
-            return arg.value().has_scalar_function() || 
arg.value().has_singular_or_list();
-          }));
-
-      // Set the children functions to filter info. They should be
-      // effective to the same field.
-      for (const auto& arg : scalarFunction.arguments()) {
-        const auto& expr = arg.value();
-        if (expr.has_scalar_function()) {
-          setFilterInfo(arg.value().scalar_function(), inputTypeList, 
columnToFilterInfo);
-        } else if (expr.has_singular_or_list()) {
-          setFilterInfo(expr.singular_or_list(), columnToFilterInfo);
-        } else {
-          VELOX_NYI("Scalar function or SingularOrList expected.");
-        }
-      }
-    } else {
-      setFilterInfo(scalarFunction, inputTypeList, columnToFilterInfo);
-    }
-  }
-
-  // Process singularOrLists.
-  for (const auto& list : singularOrLists) {
-    setFilterInfo(list, columnToFilterInfo);
-  }
-
-  return mapToFilters(inputNameList, inputTypeList, columnToFilterInfo);
-}
-
-bool SubstraitToVeloxPlanConverter::fieldOrWithLiteral(
-    const ::google::protobuf::RepeatedPtrField<::substrait::FunctionArgument>& 
arguments,
-    uint32_t& fieldIndex) {
-  if (arguments.size() == 1) {
-    if (arguments[0].value().has_selection()) {
-      // Only field exists.
-      return 
SubstraitParser::parseReferenceSegment(arguments[0].value().selection().direct_reference(),
 fieldIndex);
-    } else {
-      return false;
-    }
-  }
-
-  if (arguments.size() != 2) {
-    // Not the field and literal combination.
-    return false;
-  }
-  bool fieldExists = false;
-  bool literalExists = false;
-  for (const auto& param : arguments) {
-    auto typeCase = param.value().rex_type_case();
-    switch (typeCase) {
-      case ::substrait::Expression::RexTypeCase::kSelection: {
-        if 
(!SubstraitParser::parseReferenceSegment(param.value().selection().direct_reference(),
 fieldIndex)) {
-          return false;
-        }
-        fieldExists = true;
-        break;
-      }
-      case ::substrait::Expression::RexTypeCase::kLiteral: {
-        literalExists = true;
-        break;
-      }
-      default:
-        break;
-    }
-  }
-  // Whether the field and literal both exist.
-  return fieldExists && literalExists;
-}
-
-bool SubstraitToVeloxPlanConverter::childrenFunctionsOnSameField(
-    const ::substrait::Expression_ScalarFunction& function) {
-  // Get the column indices of the children functions.
-  std::vector<uint32_t> colIndices;
-  for (const auto& arg : function.arguments()) {
-    if (arg.value().has_scalar_function()) {
-      const auto& scalarFunction = arg.value().scalar_function();
-      for (const auto& param : scalarFunction.arguments()) {
-        if (param.value().has_selection()) {
-          const auto& field = param.value().selection();
-          VELOX_CHECK(field.has_direct_reference());
-          uint32_t colIdx;
-          if 
(!SubstraitParser::parseReferenceSegment(field.direct_reference(), colIdx)) {
-            return false;
-          }
-          colIndices.emplace_back(colIdx);
-        }
-      }
-    } else if (arg.value().has_singular_or_list()) {
-      const auto& singularOrList = arg.value().singular_or_list();
-      
colIndices.emplace_back(getColumnIndexFromSingularOrList(singularOrList));
-    } else {
-      return false;
-    }
-  }
-
-  if (std::all_of(colIndices.begin(), colIndices.end(), [&](uint32_t idx) { 
return idx == colIndices[0]; })) {
-    // All indices are the same.
-    return true;
-  }
-  return false;
-}
-
-bool SubstraitToVeloxPlanConverter::canPushdownFunction(
-    const ::substrait::Expression_ScalarFunction& scalarFunction,
-    const std::string& filterName,
-    uint32_t& fieldIdx) {
-  // Condtions can be pushed down.
-  static const std::unordered_set<std::string> supportedFunctions = 
{sIsNotNull, sIsNull, sGte, sGt, sLte, sLt, sEqual};
-
-  bool canPushdown = false;
-  if (supportedFunctions.find(filterName) != supportedFunctions.end() &&
-      fieldOrWithLiteral(scalarFunction.arguments(), fieldIdx)) {
-    // The arg should be field or field with literal.
-    canPushdown = true;
-  }
-  return canPushdown;
-}
-
-bool SubstraitToVeloxPlanConverter::canPushdownNot(
-    const ::substrait::Expression_ScalarFunction& scalarFunction,
-    std::vector<RangeRecorder>& rangeRecorders) {
-  VELOX_CHECK(scalarFunction.arguments().size() == 1, "Only one arg is 
expected for Not.");
-  const auto& notArg = scalarFunction.arguments()[0];
-  if (notArg.value().has_singular_or_list()) {
-    auto singularOrList = notArg.value().singular_or_list();
-    if (!canPushdownSingularOrList(singularOrList)) {
-      return false;
-    }
-    uint32_t colIdx = getColumnIndexFromSingularOrList(singularOrList);
-    return rangeRecorders.at(colIdx).setInRange();
-  } else if (notArg.value().has_scalar_function()) {
-    auto argFunction =
-        SubstraitParser::findFunctionSpec(functionMap_, 
notArg.value().scalar_function().function_reference());
-    auto functionName = SubstraitParser::getNameBeforeDelimiter(argFunction);
-
-    static const std::unordered_set<std::string> supportedNotFunctions = 
{sGte, sGt, sLte, sLt, sEqual};
-
-    uint32_t fieldIdx;
-    bool isFieldOrWithLiteral = 
fieldOrWithLiteral(notArg.value().scalar_function().arguments(), fieldIdx);
-
-    return (
-        supportedNotFunctions.find(functionName) != 
supportedNotFunctions.end() && isFieldOrWithLiteral &&
-        rangeRecorders.at(fieldIdx).setCertainRangeForFunction(functionName, 
true /*reverse*/));
-  }
-  return false;
-}
-
-bool SubstraitToVeloxPlanConverter::canPushdownOr(
-    const ::substrait::Expression_ScalarFunction& scalarFunction,
-    std::vector<RangeRecorder>& rangeRecorders) {
-  // OR Conditon whose children functions are on different columns is not
-  // supported to be pushed down.
-  if (!childrenFunctionsOnSameField(scalarFunction)) {
-    return false;
-  }
-
-  static const std::unordered_set<std::string> supportedOrFunctions = 
{sIsNotNull, sGte, sGt, sLte, sLt, sEqual};
-
-  for (const auto& arg : scalarFunction.arguments()) {
-    if (arg.value().has_scalar_function()) {
-      auto nameSpec =
-          SubstraitParser::findFunctionSpec(functionMap_, 
arg.value().scalar_function().function_reference());
-      auto functionName = SubstraitParser::getNameBeforeDelimiter(nameSpec);
-
-      uint32_t fieldIdx;
-      bool isFieldOrWithLiteral = 
fieldOrWithLiteral(arg.value().scalar_function().arguments(), fieldIdx);
-      if (supportedOrFunctions.find(functionName) == 
supportedOrFunctions.end() || !isFieldOrWithLiteral ||
-          !rangeRecorders.at(fieldIdx).setCertainRangeForFunction(
-              functionName, false /*reverse*/, true /*forOrRelation*/)) {
-        // The arg should be field or field with literal.
-        return false;
-      }
-    } else if (arg.value().has_singular_or_list()) {
-      const auto& singularOrList = arg.value().singular_or_list();
-      if (!canPushdownSingularOrList(singularOrList, true)) {
-        return false;
-      }
-      uint32_t fieldIdx = getColumnIndexFromSingularOrList(singularOrList);
-      // Disable IN pushdown for int-like types.
-      if (!rangeRecorders.at(fieldIdx).setInRange(true /*forOrRelation*/)) {
-        return false;
-      }
-    } else {
-      // Or relation betweeen other expressions is not supported to be pushded
-      // down currently.
-      return false;
-    }
-  }
-  return true;
-}
-
-void SubstraitToVeloxPlanConverter::separateFilters(
-    std::vector<RangeRecorder>& rangeRecorders,
-    const std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions,
-    std::vector<::substrait::Expression_ScalarFunction>& subfieldFunctions,
-    std::vector<::substrait::Expression_ScalarFunction>& remainingFunctions,
-    const std::vector<::substrait::Expression_SingularOrList>& singularOrLists,
-    std::vector<::substrait::Expression_SingularOrList>& subfieldOrLists,
-    std::vector<::substrait::Expression_SingularOrList>& remainingOrLists,
-    const std::vector<TypePtr>& veloxTypeList,
-    const dwio::common::FileFormat& format) {
-  for (const auto& singularOrList : singularOrLists) {
-    if (!canPushdownSingularOrList(singularOrList)) {
-      remainingOrLists.emplace_back(singularOrList);
-      continue;
-    }
-    uint32_t colIdx = getColumnIndexFromSingularOrList(singularOrList);
-    if (rangeRecorders.at(colIdx).setInRange()) {
-      subfieldOrLists.emplace_back(singularOrList);
-    } else {
-      remainingOrLists.emplace_back(singularOrList);
-    }
-  }
-
-  for (const auto& scalarFunction : scalarFunctions) {
-    auto filterNameSpec = SubstraitParser::findFunctionSpec(functionMap_, 
scalarFunction.function_reference());
-    auto filterName = SubstraitParser::getNameBeforeDelimiter(filterNameSpec);
-    // Add all decimal filters to remaining functions because their pushdown 
are not supported.
-    if (format == dwio::common::FileFormat::ORC && 
scalarFunction.arguments().size() > 0) {
-      auto value = scalarFunction.arguments().at(0).value();
-      if (value.has_selection()) {
-        uint32_t fieldIndex;
-        bool parsed = 
SubstraitParser::parseReferenceSegment(value.selection().direct_reference(), 
fieldIndex);
-        if (!parsed || (!veloxTypeList.empty() && 
veloxTypeList.at(fieldIndex)->isDecimal())) {
-          remainingFunctions.emplace_back(scalarFunction);
-          continue;
-        }
-      }
-    }
-
-    // Check whether NOT and OR functions can be pushed down.
-    // If yes, the scalar function will be added into the subfield functions.
-    if (filterName == sNot) {
-      if (canPushdownNot(scalarFunction, rangeRecorders)) {
-        subfieldFunctions.emplace_back(scalarFunction);
-      } else {
-        remainingFunctions.emplace_back(scalarFunction);
-      }
-    } else if (filterName == sOr) {
-      if (canPushdownOr(scalarFunction, rangeRecorders)) {
-        subfieldFunctions.emplace_back(scalarFunction);
-      } else {
-        remainingFunctions.emplace_back(scalarFunction);
-      }
-    } else {
-      // Check if the condition is supported to be pushed down.
-      uint32_t fieldIdx;
-      if (canPushdownFunction(scalarFunction, filterName, fieldIdx) &&
-          rangeRecorders.at(fieldIdx).setCertainRangeForFunction(filterName)) {
-        subfieldFunctions.emplace_back(scalarFunction);
-      } else {
-        remainingFunctions.emplace_back(scalarFunction);
-      }
-    }
-  }
-}
-
-bool SubstraitToVeloxPlanConverter::RangeRecorder::setCertainRangeForFunction(
-    const std::string& functionName,
-    bool reverse,
-    bool forOrRelation) {
-  if (functionName == sLt || functionName == sLte) {
-    if (reverse) {
-      return setLeftBound(forOrRelation);
-    } else {
-      return setRightBound(forOrRelation);
-    }
-  } else if (functionName == sGt || functionName == sGte) {
-    if (reverse) {
-      return setRightBound(forOrRelation);
-    } else {
-      return setLeftBound(forOrRelation);
-    }
-  } else if (functionName == sEqual) {
-    if (reverse) {
-      // Not equal means lt or gt.
-      return setMultiRange();
-    } else {
-      return setLeftBound(forOrRelation) && setRightBound(forOrRelation);
-    }
-  } else if (functionName == sOr) {
-    if (reverse) {
-      // Not supported.
-      return false;
-    } else {
-      return setMultiRange();
-    }
-  } else if (functionName == sIsNotNull) {
-    if (reverse) {
-      // Not supported.
-      return false;
-    } else {
-      // Is not null can always coexist with the other range.
-      return true;
-    }
-  } else if (functionName == sIsNull) {
-    if (reverse) {
-      return setCertainRangeForFunction(sIsNotNull, false, forOrRelation);
-    } else {
-      return setIsNull();
-    }
-  } else {
-    return false;
-  }
-}
-
-void SubstraitToVeloxPlanConverter::setColumnFilterInfo(
-    const std::string& filterName,
-    std::optional<variant> literalVariant,
-    FilterInfo& columnFilterInfo,
-    bool reverse) {
-  if (filterName == sIsNotNull) {
-    if (reverse) {
-      columnFilterInfo.setNull();
-    } else {
-      columnFilterInfo.forbidsNull();
-    }
-  } else if (filterName == sIsNull) {
-    if (reverse) {
-      columnFilterInfo.forbidsNull();
-    } else {
-      columnFilterInfo.setNull();
-    }
-  } else if (filterName == sGte) {
-    if (reverse) {
-      columnFilterInfo.setUpper(literalVariant, true);
-    } else {
-      columnFilterInfo.setLower(literalVariant, false);
-    }
-  } else if (filterName == sGt) {
-    if (reverse) {
-      columnFilterInfo.setUpper(literalVariant, false);
-    } else {
-      columnFilterInfo.setLower(literalVariant, true);
-    }
-  } else if (filterName == sLte) {
-    if (reverse) {
-      columnFilterInfo.setLower(literalVariant, true);
-    } else {
-      columnFilterInfo.setUpper(literalVariant, false);
-    }
-  } else if (filterName == sLt) {
-    if (reverse) {
-      columnFilterInfo.setLower(literalVariant, false);
-    } else {
-      columnFilterInfo.setUpper(literalVariant, true);
-    }
-  } else if (filterName == sEqual) {
-    if (reverse) {
-      columnFilterInfo.setNotValue(literalVariant);
-    } else {
-      columnFilterInfo.setLower(literalVariant, false);
-      columnFilterInfo.setUpper(literalVariant, false);
-    }
-  } else {
-    VELOX_NYI("setColumnFilterInfo not supported for filter name '{}'", 
filterName);
-  }
-}
-
-template <facebook::velox::TypeKind kind>
-variant getVariantFromLiteral(const ::substrait::Expression::Literal& literal) 
{
-  using LitT = typename facebook::velox::TypeTraits<kind>::NativeType;
-  return variant(SubstraitParser::getLiteralValue<LitT>(literal));
-}
-
-void SubstraitToVeloxPlanConverter::setFilterInfo(
-    const ::substrait::Expression_ScalarFunction& scalarFunction,
-    const std::vector<TypePtr>& inputTypeList,
-    std::vector<FilterInfo>& columnToFilterInfo,
-    bool reverse) {
-  auto nameSpec = SubstraitParser::findFunctionSpec(functionMap_, 
scalarFunction.function_reference());
-  auto functionName = SubstraitParser::getNameBeforeDelimiter(nameSpec);
-
-  // Extract the column index and column bound from the scalar function.
-  std::optional<uint32_t> colIdx;
-  std::optional<::substrait::Expression_Literal> substraitLit;
-  std::vector<std::string> typeCases;
-
-  for (const auto& param : scalarFunction.arguments()) {
-    auto typeCase = param.value().rex_type_case();
-    switch (typeCase) {
-      case ::substrait::Expression::RexTypeCase::kSelection: {
-        typeCases.emplace_back("kSelection");
-        uint32_t index;
-        VELOX_CHECK(
-            
SubstraitParser::parseReferenceSegment(param.value().selection().direct_reference(),
 index),
-            "Failed to parse the column index from the selection.");
-        colIdx = index;
-        break;
-      }
-      case ::substrait::Expression::RexTypeCase::kLiteral: {
-        typeCases.emplace_back("kLiteral");
-        substraitLit = param.value().literal();
-        break;
-      }
-      default:
-        VELOX_NYI("Substrait conversion not supported for arg type '{}'", 
std::to_string(typeCase));
-    }
-  }
-
-  static const std::unordered_map<std::string, std::string> functionRevertMap 
= {
-      {sLt, sGt}, {sGt, sLt}, {sGte, sLte}, {sLte, sGte}};
-
-  // Handle the case where literal is before the variable in a binary 
function, e.g. "123 < q1".
-  if (typeCases.size() > 1 && (typeCases[0] == "kLiteral" && typeCases[1] == 
"kSelection")) {
-    auto x = functionRevertMap.find(functionName);
-    if (x != functionRevertMap.end()) {
-      // Change the function name: lt => gt, gt => lt, gte => lte, lte => gte.
-      functionName = x->second;
-    }
-  }
-
-  if (!colIdx.has_value()) {
-    VELOX_NYI("Column index is expected in subfield filters creation.");
-  }
-
-  // Set the extracted bound to the specific column.
-  uint32_t colIdxVal = colIdx.value();
-  std::optional<variant> val;
-
-  auto inputType = inputTypeList[colIdxVal];
-  switch (inputType->kind()) {
-    case TypeKind::TINYINT:
-    case TypeKind::SMALLINT:
-    case TypeKind::INTEGER:
-    case TypeKind::BIGINT:
-    case TypeKind::REAL:
-    case TypeKind::DOUBLE:
-    case TypeKind::BOOLEAN:
-    case TypeKind::VARCHAR:
-    case TypeKind::HUGEINT:
-      if (substraitLit) {
-        auto kind = inputType->kind();
-        val = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(getVariantFromLiteral, kind, 
substraitLit.value());
-      }
-      break;
-    case TypeKind::ARRAY:
-    case TypeKind::MAP:
-    case TypeKind::ROW:
-      // Doing nothing here can let filter IsNotNull still work.
-      break;
-    default:
-      VELOX_NYI("Subfield filters creation not supported for input type '{}' 
in setFilterInfo", inputType->toString());
-  }
-
-  setColumnFilterInfo(functionName, val, columnToFilterInfo[colIdxVal], 
reverse);
-}
-
-template <TypeKind KIND, typename FilterType>
-void SubstraitToVeloxPlanConverter::createNotEqualFilter(
-    variant notVariant,
-    bool nullAllowed,
-    std::vector<std::unique_ptr<FilterType>>& colFilters) {
-  using NativeType = typename RangeTraits<KIND>::NativeType;
-  using RangeType = typename RangeTraits<KIND>::RangeType;
-  // Value > lower
-  std::unique_ptr<FilterType> lowerFilter;
-  if constexpr (std::is_same_v<RangeType, common::BigintRange>) {
-    if (notVariant.value<NativeType>() < getMax<NativeType>()) {
-      lowerFilter = std::make_unique<common::BigintRange>(
-          notVariant.value<NativeType>() + 1 /*lower*/, getMax<NativeType>() 
/*upper*/, nullAllowed);
-    }
-  } else {
-    lowerFilter = std::make_unique<RangeType>(
-        notVariant.value<NativeType>() /*lower*/,
-        false /*lowerUnbounded*/,
-        true /*lowerExclusive*/,
-        getMax<NativeType>() /*upper*/,
-        true /*upperUnbounded*/,
-        false /*upperExclusive*/,
-        nullAllowed);
-  }
-
-  // Value < upper
-  std::unique_ptr<FilterType> upperFilter;
-  if constexpr (std::is_same_v<RangeType, common::BigintRange>) {
-    if (getLowest<NativeType>() < notVariant.value<NativeType>()) {
-      upperFilter = std::make_unique<common::BigintRange>(
-          getLowest<NativeType>() /*lower*/, notVariant.value<NativeType>() - 
1 /*upper*/, nullAllowed);
-    }
-  } else {
-    upperFilter = std::make_unique<RangeType>(
-        getLowest<NativeType>() /*lower*/,
-        true /*lowerUnbounded*/,
-        false /*lowerExclusive*/,
-        notVariant.value<NativeType>() /*upper*/,
-        false /*upperUnbounded*/,
-        true /*upperExclusive*/,
-        nullAllowed);
-  }
-
-  // To avoid overlap of BigintMultiRange, keep this appending order to make 
sure lower bound of one range is less than
-  // the upper bounds of others.
-  if (upperFilter) {
-    colFilters.emplace_back(std::move(upperFilter));
-  }
-  if (lowerFilter) {
-    colFilters.emplace_back(std::move(lowerFilter));
-  }
-}
-
-template <TypeKind KIND>
-void SubstraitToVeloxPlanConverter::setInFilter(
-    const std::vector<variant>& variants,
-    bool nullAllowed,
-    bool negated,
-    const std::string& inputName,
-    connector::hive::SubfieldFilters& filters) {}
-
-template <>
-void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::BIGINT>(
-    const std::vector<variant>& variants,
-    bool nullAllowed,
-    bool negated,
-    const std::string& inputName,
-    connector::hive::SubfieldFilters& filters) {
-  std::vector<int64_t> values;
-  values.reserve(variants.size());
-  for (const auto& variant : variants) {
-    int64_t value = variant.value<int64_t>();
-    values.emplace_back(value);
-  }
-  if (negated) {
-    filters[common::Subfield(std::move(getPath(inputName)))] = 
common::createNegatedBigintValues(values, nullAllowed);
-  } else {
-    filters[common::Subfield(std::move(getPath(inputName)))] = 
common::createBigintValues(values, nullAllowed);
-  }
-}
-
-template <>
-void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::INTEGER>(
-    const std::vector<variant>& variants,
-    bool nullAllowed,
-    bool negated,
-    const std::string& inputName,
-    connector::hive::SubfieldFilters& filters) {
-  // Use bigint values for int type.
-  std::vector<int64_t> values;
-  values.reserve(variants.size());
-  for (const auto& variant : variants) {
-    // Use the matched type to get value from variant.
-    int64_t value = variant.value<int32_t>();
-    values.emplace_back(value);
-  }
-  if (negated) {
-    filters[common::Subfield(std::move(getPath(inputName)))] = 
common::createNegatedBigintValues(values, nullAllowed);
-  } else {
-    filters[common::Subfield(std::move(getPath(inputName)))] = 
common::createBigintValues(values, nullAllowed);
-  }
-}
-
-template <>
-void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::SMALLINT>(
-    const std::vector<variant>& variants,
-    bool nullAllowed,
-    bool negated,
-    const std::string& inputName,
-    connector::hive::SubfieldFilters& filters) {
-  // Use bigint values for small int type.
-  std::vector<int64_t> values;
-  values.reserve(variants.size());
-  for (const auto& variant : variants) {
-    // Use the matched type to get value from variant.
-    int64_t value = variant.value<int16_t>();
-    values.emplace_back(value);
-  }
-  if (negated) {
-    filters[common::Subfield(std::move(getPath(inputName)))] = 
common::createNegatedBigintValues(values, nullAllowed);
-  } else {
-    filters[common::Subfield(std::move(getPath(inputName)))] = 
common::createBigintValues(values, nullAllowed);
-  }
-}
-
-template <>
-void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::TINYINT>(
-    const std::vector<variant>& variants,
-    bool nullAllowed,
-    bool negated,
-    const std::string& inputName,
-    connector::hive::SubfieldFilters& filters) {
-  // Use bigint values for tiny int type.
-  std::vector<int64_t> values;
-  values.reserve(variants.size());
-  for (const auto& variant : variants) {
-    // Use the matched type to get value from variant.
-    int64_t value = variant.value<int8_t>();
-    values.emplace_back(value);
-  }
-  if (negated) {
-    filters[common::Subfield(std::move(getPath(inputName)))] = 
common::createNegatedBigintValues(values, nullAllowed);
-  } else {
-    filters[common::Subfield(std::move(getPath(inputName)))] = 
common::createBigintValues(values, nullAllowed);
-  }
-}
-
-template <>
-void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::VARCHAR>(
-    const std::vector<variant>& variants,
-    bool nullAllowed,
-    bool negated,
-    const std::string& inputName,
-    connector::hive::SubfieldFilters& filters) {
-  std::vector<std::string> values;
-  values.reserve(variants.size());
-  for (const auto& variant : variants) {
-    std::string value = variant.value<std::string>();
-    values.emplace_back(value);
-  }
-  if (negated) {
-    filters[common::Subfield(std::move(getPath(inputName)))] =
-        std::make_unique<common::NegatedBytesValues>(values, nullAllowed);
-  } else {
-    filters[common::Subfield(std::move(getPath(inputName)))] =
-        std::make_unique<common::BytesValues>(values, nullAllowed);
-  }
-}
-
-template <TypeKind KIND, typename FilterType>
-void SubstraitToVeloxPlanConverter::setSubfieldFilter(
-    std::vector<std::unique_ptr<FilterType>> colFilters,
-    const std::string& inputName,
-    bool nullAllowed,
-    connector::hive::SubfieldFilters& filters) {
-  using MultiRangeType = typename RangeTraits<KIND>::MultiRangeType;
-
-  if (colFilters.size() == 1) {
-    filters[common::Subfield(std::move(getPath(inputName)))] = 
std::move(colFilters[0]);
-  } else if (colFilters.size() > 1) {
-    // BigintMultiRange should have been sorted
-    if (colFilters[0]->kind() == common::FilterKind::kBigintRange) {
-      std::sort(colFilters.begin(), colFilters.end(), [](const auto& a, const 
auto& b) {
-        return dynamic_cast<common::BigintRange*>(a.get())->lower() <
-            dynamic_cast<common::BigintRange*>(b.get())->lower();
-      });
-    }
-    if constexpr (std::is_same_v<MultiRangeType, common::MultiRange>) {
-      filters[common::Subfield(std::move(getPath(inputName)))] =
-          std::make_unique<common::MultiRange>(std::move(colFilters), 
nullAllowed, true /*nanAllowed*/);
-    } else {
-      filters[common::Subfield(std::move(getPath(inputName)))] =
-          std::make_unique<MultiRangeType>(std::move(colFilters), nullAllowed);
-    }
-  }
-}
-
-template <TypeKind KIND, typename FilterType>
-void SubstraitToVeloxPlanConverter::constructSubfieldFilters(
-    uint32_t colIdx,
-    const std::string& inputName,
-    const TypePtr& inputType,
-    const FilterInfo& filterInfo,
-    connector::hive::SubfieldFilters& filters) {
-  if (!filterInfo.isInitialized()) {
-    return;
-  }
-
-  bool nullAllowed = filterInfo.nullAllowed_;
-  bool isNull = filterInfo.isNull_;
-  bool existIsNullAndIsNotNull = filterInfo.forbidsNullSet_ && 
filterInfo.isNullSet_;
-  uint32_t rangeSize = std::max(filterInfo.lowerBounds_.size(), 
filterInfo.upperBounds_.size());
-
-  if constexpr (KIND == facebook::velox::TypeKind::HUGEINT) {
-    // TODO: open it when the Velox's modification is ready.
-    VELOX_NYI("constructSubfieldFilters not support for HUGEINT type");
-  } else if constexpr (KIND == facebook::velox::TypeKind::BOOLEAN) {
-    // Handle bool type filters.
-    // Not equal.
-    if (filterInfo.notValue_) {
-      filters[common::Subfield(std::move(getPath(inputName)))] =
-          
std::make_unique<common::BoolValue>(!filterInfo.notValue_.value().value<bool>(),
 nullAllowed);
-    } else if (filterInfo.notValues_.size() > 0) {
-      std::set<bool> notValues;
-      for (auto v : filterInfo.notValues_) {
-        notValues.emplace(v.value<bool>());
-      }
-      if (notValues.size() == 1) {
-        filters[common::Subfield(std::move(getPath(inputName)))] =
-            std::make_unique<common::BoolValue>(!(*notValues.begin()), 
nullAllowed);
-      } else {
-        // if there are more than one distinct value in NOT IN list, the 
filter should be AlwaysFalse
-        filters[common::Subfield(std::move(getPath(inputName)))] = 
std::make_unique<common::AlwaysFalse>();
-      }
-    } else if (rangeSize == 0) {
-      // IsNull/IsNotNull.
-      if (!nullAllowed) {
-        filters[common::Subfield(std::move(getPath(inputName)))] = 
std::make_unique<common::IsNotNull>();
-      } else if (isNull) {
-        filters[common::Subfield(std::move(getPath(inputName)))] = 
std::make_unique<common::IsNull>();
-      } else {
-        VELOX_NYI("Only IsNotNull and IsNull are supported in 
constructSubfieldFilters when no other filter ranges.");
-      }
-      return;
-    } else {
-      // Equal.
-      auto value = filterInfo.lowerBounds_[0].value().value<bool>();
-      VELOX_CHECK(value == filterInfo.upperBounds_[0].value().value<bool>(), 
"invalid state of bool equal");
-      filters[common::Subfield(std::move(getPath(inputName)))] =
-          std::make_unique<common::BoolValue>(value, nullAllowed);
-    }
-  } else if constexpr (
-      KIND == facebook::velox::TypeKind::ARRAY || KIND == 
facebook::velox::TypeKind::MAP ||
-      KIND == facebook::velox::TypeKind::ROW) {
-    // Only IsNotNull and IsNull are supported for complex types.
-    VELOX_CHECK_EQ(rangeSize, 0, "Only IsNotNull and IsNull are supported for 
complex type.");
-    if (!nullAllowed) {
-      filters[common::Subfield(std::move(getPath(inputName)))] = 
std::make_unique<common::IsNotNull>();
-    } else if (isNull) {
-      filters[common::Subfield(std::move(getPath(inputName)))] = 
std::make_unique<common::IsNull>();
-    } else {
-      VELOX_NYI("Only IsNotNull and IsNull are supported for input type 
'{}'.", inputType->toString());
-    }
-  } else {
-    using NativeType = typename RangeTraits<KIND>::NativeType;
-    using RangeType = typename RangeTraits<KIND>::RangeType;
-    using MultiRangeType = typename RangeTraits<KIND>::MultiRangeType;
-
-    // Handle 'in' filter.
-    if (filterInfo.values_.size() > 0) {
-      // To filter out null is a default behaviour of Spark IN expression.
-      nullAllowed = false;
-      setInFilter<KIND>(filterInfo.values_, nullAllowed, false, inputName, 
filters);
-      // Currently, In cannot coexist with other filter conditions
-      // due to multirange is in 'OR' relation but 'AND' is needed.
-      VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot 
be supported after IN filter.");
-      VELOX_CHECK(!filterInfo.notValue_.has_value(), "Not equal cannot be 
supported after IN filter.");
-      VELOX_CHECK(filterInfo.notValues_.size() == 0, "Not in cannot be 
supported after IN filter.");
-      return;
-    }
-
-    // Handle not in filter.
-    if (filterInfo.notValues_.size() > 0) {
-      setInFilter<KIND>(filterInfo.notValues_, filterInfo.nullAllowed_, true, 
inputName, filters);
-      // Currently, NOT In cannot coexist with other filter conditions
-      // due to multirange is in 'OR' relation but 'AND' is needed.
-      VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot 
be supported after NOT IN filter.");
-      VELOX_CHECK(!filterInfo.notValue_.has_value(), "Not equal cannot be 
supported after NOT IN filter.");
-      return;
-    }
-
-    // Construct the Filters.
-    std::vector<std::unique_ptr<FilterType>> colFilters;
-
-    // Handle not(equal) filter.
-    if (filterInfo.notValue_) {
-      variant notVariant = filterInfo.notValue_.value();
-      createNotEqualFilter<KIND, FilterType>(notVariant, 
filterInfo.nullAllowed_, colFilters);
-      // Currently, Not-equal cannot coexist with other filter conditions
-      // due to multirange is in 'OR' relation but 'AND' is needed.
-      VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot 
be supported after not-equal filter.");
-      if constexpr (std::is_same_v<MultiRangeType, common::MultiRange>) {
-        if (colFilters.size() == 1) {
-          filters[common::Subfield(std::move(getPath(inputName)))] = 
std::move(colFilters.front());
-        } else {
-          filters[common::Subfield(std::move(getPath(inputName)))] =
-              std::make_unique<common::MultiRange>(std::move(colFilters), 
nullAllowed, true /*nanAllowed*/);
-        }
-      } else {
-        if (colFilters.size() == 1) {
-          filters[common::Subfield(std::move(getPath(inputName)))] = 
std::move(colFilters.front());
-        } else {
-          filters[common::Subfield(std::move(getPath(inputName)))] =
-              std::make_unique<MultiRangeType>(std::move(colFilters), 
nullAllowed);
-        }
-      }
-      return;
-    }
-
-    // Handle null filtering.
-    if (rangeSize == 0) {
-      // handle is not null and is null exists at same time
-      if (existIsNullAndIsNotNull) {
-        filters[common::Subfield(std::move(getPath(inputName)))] = 
std::move(std::make_unique<common::AlwaysFalse>());
-      } else if (!nullAllowed) {
-        filters[common::Subfield(std::move(getPath(inputName)))] = 
std::make_unique<common::IsNotNull>();
-      } else if (isNull) {
-        filters[common::Subfield(std::move(getPath(inputName)))] = 
std::make_unique<common::IsNull>();
-      } else {
-        VELOX_NYI("Only IsNotNull and IsNull are supported in 
constructSubfieldFilters when no other filter ranges.");
-      }
-      return;
-    }
-
-    NativeType lowerBound;
-    if constexpr (KIND == facebook::velox::TypeKind::BIGINT) {
-      if (inputType->isShortDecimal()) {
-        lowerBound = DecimalUtil::kShortDecimalMin;
-      } else {
-        lowerBound = getLowest<NativeType>();
-      }
-    } else {
-      lowerBound = getLowest<NativeType>();
-    }
-
-    NativeType upperBound;
-    if constexpr (KIND == facebook::velox::TypeKind::BIGINT) {
-      if (inputType->isShortDecimal()) {
-        upperBound = DecimalUtil::kShortDecimalMax;
-      } else {
-        upperBound = getMax<NativeType>();
-      }
-    } else {
-      upperBound = getMax<NativeType>();
-    }
-
-    [[maybe_unused]] bool lowerUnbounded = true;
-    [[maybe_unused]] bool upperUnbounded = true;
-    bool lowerExclusive = false;
-    bool upperExclusive = false;
-
-    // Handle other filter ranges.
-    for (uint32_t idx = 0; idx < rangeSize; idx++) {
-      if (idx < filterInfo.lowerBounds_.size() && 
filterInfo.lowerBounds_[idx]) {
-        lowerUnbounded = false;
-        variant lowerVariant = filterInfo.lowerBounds_[idx].value();
-        lowerBound = lowerVariant.value<NativeType>();
-        lowerExclusive = filterInfo.lowerExclusives_[idx];
-      }
-
-      if (idx < filterInfo.upperBounds_.size() && 
filterInfo.upperBounds_[idx]) {
-        upperUnbounded = false;
-        variant upperVariant = filterInfo.upperBounds_[idx].value();
-        upperBound = upperVariant.value<NativeType>();
-        upperExclusive = filterInfo.upperExclusives_[idx];
-      }
-
-      std::unique_ptr<FilterType> filter;
-      if constexpr (std::is_same_v<RangeType, common::BigintRange>) {
-        filter = std::move(std::make_unique<common::BigintRange>(
-            lowerExclusive ? lowerBound + 1 : lowerBound, upperExclusive ? 
upperBound - 1 : upperBound, nullAllowed));
-      } else {
-        filter = std::move(std::make_unique<RangeType>(
-            lowerBound, lowerUnbounded, lowerExclusive, upperBound, 
upperUnbounded, upperExclusive, nullAllowed));
-      }
-
-      colFilters.emplace_back(std::move(filter));
-    }
-
-    // Set the SubfieldFilter.
-    setSubfieldFilter<KIND, FilterType>(std::move(colFilters), inputName, 
filterInfo.nullAllowed_, filters);
-  }
-}
-
 bool SubstraitToVeloxPlanConverter::checkTypeExtension(const 
::substrait::Plan& substraitPlan) {
   for (const auto& sExtension : substraitPlan.extensions()) {
     if (!sExtension.has_extension_type()) {
@@ -2367,199 +1396,4 @@ bool 
SubstraitToVeloxPlanConverter::checkTypeExtension(const ::substrait::Plan&
   return true;
 }
 
-connector::hive::SubfieldFilters SubstraitToVeloxPlanConverter::mapToFilters(
-    const std::vector<std::string>& inputNameList,
-    const std::vector<TypePtr>& inputTypeList,
-    std::vector<FilterInfo>& columnToFilterInfo) {
-  // Construct the subfield filters based on the filter info map.
-  connector::hive::SubfieldFilters filters;
-  for (uint32_t colIdx = 0; colIdx < inputNameList.size(); colIdx++) {
-    if (columnToFilterInfo[colIdx].isInitialized()) {
-      auto inputType = inputTypeList[colIdx];
-      if (inputType->isDate()) {
-        constructSubfieldFilters<TypeKind::INTEGER, common::BigintRange>(
-            colIdx, inputNameList[colIdx], inputType, 
columnToFilterInfo[colIdx], filters);
-        continue;
-      }
-      switch (inputType->kind()) {
-        case TypeKind::TINYINT:
-          constructSubfieldFilters<TypeKind::TINYINT, common::BigintRange>(
-              colIdx, inputNameList[colIdx], inputType, 
columnToFilterInfo[colIdx], filters);
-          break;
-        case TypeKind::SMALLINT:
-          constructSubfieldFilters<TypeKind::SMALLINT, common::BigintRange>(
-              colIdx, inputNameList[colIdx], inputType, 
columnToFilterInfo[colIdx], filters);
-          break;
-        case TypeKind::INTEGER:
-          constructSubfieldFilters<TypeKind::INTEGER, common::BigintRange>(
-              colIdx, inputNameList[colIdx], inputType, 
columnToFilterInfo[colIdx], filters);
-          break;
-        case TypeKind::BIGINT:
-          constructSubfieldFilters<TypeKind::BIGINT, common::BigintRange>(
-              colIdx, inputNameList[colIdx], inputType, 
columnToFilterInfo[colIdx], filters);
-          break;
-        case TypeKind::REAL:
-          constructSubfieldFilters<TypeKind::REAL, common::Filter>(
-              colIdx, inputNameList[colIdx], inputType, 
columnToFilterInfo[colIdx], filters);
-          break;
-        case TypeKind::DOUBLE:
-          constructSubfieldFilters<TypeKind::DOUBLE, common::Filter>(
-              colIdx, inputNameList[colIdx], inputType, 
columnToFilterInfo[colIdx], filters);
-          break;
-        case TypeKind::BOOLEAN:
-          constructSubfieldFilters<TypeKind::BOOLEAN, common::BoolValue>(
-              colIdx, inputNameList[colIdx], inputType, 
columnToFilterInfo[colIdx], filters);
-          break;
-        case TypeKind::VARCHAR:
-          constructSubfieldFilters<TypeKind::VARCHAR, common::Filter>(
-              colIdx, inputNameList[colIdx], inputType, 
columnToFilterInfo[colIdx], filters);
-          break;
-        case TypeKind::HUGEINT:
-          constructSubfieldFilters<TypeKind::HUGEINT, common::HugeintRange>(
-              colIdx, inputNameList[colIdx], inputType, 
columnToFilterInfo[colIdx], filters);
-          break;
-        case TypeKind::ARRAY:
-          constructSubfieldFilters<TypeKind::ARRAY, common::Filter>(
-              colIdx, inputNameList[colIdx], inputType, 
columnToFilterInfo[colIdx], filters);
-          break;
-        case TypeKind::MAP:
-          constructSubfieldFilters<TypeKind::MAP, common::Filter>(
-              colIdx, inputNameList[colIdx], inputType, 
columnToFilterInfo[colIdx], filters);
-          break;
-        case TypeKind::ROW:
-          constructSubfieldFilters<TypeKind::ROW, common::Filter>(
-              colIdx, inputNameList[colIdx], inputType, 
columnToFilterInfo[colIdx], filters);
-          break;
-        default:
-          VELOX_NYI(
-              "Subfield filters creation not supported for input type '{}' in 
mapToFilters", inputType->toString());
-      }
-    }
-  }
-
-  return filters;
-}
-
-core::TypedExprPtr SubstraitToVeloxPlanConverter::connectWithAnd(
-    std::vector<std::string> inputNameList,
-    std::vector<TypePtr> inputTypeList,
-    const std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions,
-    const std::vector<::substrait::Expression_SingularOrList>& singularOrLists,
-    const std::vector<::substrait::Expression_IfThen>& ifThens) {
-  if (scalarFunctions.size() == 0 && singularOrLists.size() == 0 && 
ifThens.size() == 0) {
-    return nullptr;
-  }
-  auto inputType = ROW(std::move(inputNameList), std::move(inputTypeList));
-
-  // Filter for scalar functions.
-  std::vector<core::TypedExprPtr> allFilters;
-  for (auto scalar : scalarFunctions) {
-    auto filter = exprConverter_->toVeloxExpr(scalar, inputType);
-    if (filter != nullptr) {
-      allFilters.emplace_back(filter);
-    }
-  }
-
-  for (auto orList : singularOrLists) {
-    auto filter = exprConverter_->toVeloxExpr(orList, inputType);
-    if (filter != nullptr) {
-      allFilters.emplace_back(filter);
-    }
-  }
-
-  for (auto ifThen : ifThens) {
-    auto filter = exprConverter_->toVeloxExpr(ifThen, inputType);
-    if (filter != nullptr) {
-      allFilters.emplace_back(filter);
-    }
-  }
-  VELOX_CHECK_GT(allFilters.size(), 0, "One filter should be valid.");
-  core::TypedExprPtr andFilter = allFilters[0];
-  for (auto i = 1; i < allFilters.size(); i++) {
-    andFilter = connectWithAnd(andFilter, allFilters[i]);
-  }
-
-  return andFilter;
-}
-
-core::TypedExprPtr SubstraitToVeloxPlanConverter::connectWithAnd(
-    core::TypedExprPtr leftExpr,
-    core::TypedExprPtr rightExpr) {
-  std::vector<core::TypedExprPtr> params;
-  params.reserve(2);
-  params.emplace_back(leftExpr);
-  params.emplace_back(rightExpr);
-  return std::make_shared<const core::CallTypedExpr>(BOOLEAN(), 
std::move(params), "and");
-}
-
-bool SubstraitToVeloxPlanConverter::canPushdownSingularOrList(
-    const ::substrait::Expression_SingularOrList& singularOrList,
-    bool disableIntLike) {
-  VELOX_CHECK(singularOrList.options_size() > 0, "At least one option is 
expected.");
-  // Check whether the value is field.
-  bool hasField = singularOrList.value().has_selection();
-  const auto& options = singularOrList.options();
-  for (const auto& option : options) {
-    VELOX_CHECK(option.has_literal(), "Literal is expected as option.");
-    auto type = option.literal().literal_type_case();
-    // Only BigintValues and BytesValues are supported.
-    if (type != ::substrait::Expression_Literal::LiteralTypeCase::kI32 &&
-        type != ::substrait::Expression_Literal::LiteralTypeCase::kI64 &&
-        type != ::substrait::Expression_Literal::LiteralTypeCase::kString) {
-      return false;
-    }
-
-    // BigintMultiRange can only accept BigintRange, so disableIntLike is set 
to
-    // true for OR pushdown of int-like types.
-    if (disableIntLike &&
-        (type == ::substrait::Expression_Literal::LiteralTypeCase::kI32 ||
-         type == ::substrait::Expression_Literal::LiteralTypeCase::kI64)) {
-      return false;
-    }
-  }
-  return hasField;
-}
-
-uint32_t SubstraitToVeloxPlanConverter::getColumnIndexFromSingularOrList(
-    const ::substrait::Expression_SingularOrList& singularOrList) {
-  // Get the column index.
-  ::substrait::Expression_FieldReference selection;
-  if (singularOrList.value().has_scalar_function()) {
-    selection = 
singularOrList.value().scalar_function().arguments()[0].value().selection();
-  } else if (singularOrList.value().has_selection()) {
-    selection = singularOrList.value().selection();
-  } else {
-    VELOX_FAIL("Unsupported type in IN pushdown.");
-  }
-  uint32_t index;
-  VELOX_CHECK(
-      SubstraitParser::parseReferenceSegment(selection.direct_reference(), 
index),
-      "Failed to parse column index from SingularOrList.");
-  return index;
-}
-
-void SubstraitToVeloxPlanConverter::setFilterInfo(
-    const ::substrait::Expression_SingularOrList& singularOrList,
-    std::vector<FilterInfo>& columnToFilterInfo,
-    bool reverse) {
-  VELOX_CHECK(singularOrList.options_size() > 0, "At least one option is 
expected.");
-  // Get the column index.
-  uint32_t colIdx = getColumnIndexFromSingularOrList(singularOrList);
-
-  // Get the value list.
-  const auto& options = singularOrList.options();
-  std::vector<variant> variants;
-  variants.reserve(options.size());
-  for (const auto& option : options) {
-    VELOX_CHECK(option.has_literal(), "Literal is expected as option.");
-    
variants.emplace_back(exprConverter_->toVeloxExpr(option.literal())->value());
-  }
-  // Set the value list to filter info.
-  if (!reverse) {
-    columnToFilterInfo[colIdx].setValues(variants);
-  } else {
-    columnToFilterInfo[colIdx].setNotValues(variants);
-  }
-}
-
 } // namespace gluten
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h 
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 0e892469d0..51e50ce347 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -215,354 +215,13 @@ class SubstraitToVeloxPlanConverter {
   /// if output order is 'kDriect'.
   core::PlanNodePtr processEmit(const ::substrait::RelCommon& relCommon, const 
core::PlanNodePtr& noEmitNode);
 
-  /// Multiple conditions are connected to a binary tree structure with
-  /// the relation key words, including AND, OR, and etc. Currently, only
-  /// AND is supported. This function is used to extract all the Substrait
-  /// conditions in the binary tree structure into a vector.
-  void flattenConditions(
-      const ::substrait::Expression& sFilter,
-      std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions,
-      std::vector<::substrait::Expression_SingularOrList>& singularOrLists,
-      std::vector<::substrait::Expression_IfThen>& ifThens);
-
   /// Check the Substrait type extension only has one unknown extension.
   static bool checkTypeExtension(const ::substrait::Plan& substraitPlan);
 
-  /// Range filter recorder for a field is used to make sure only the 
conditions
-  /// that can coexist for this field being pushed down with a range filter.
-  class RangeRecorder {
-   public:
-    /// Set the existence of values range and returns whether this condition 
can
-    /// coexist with existing conditions for one field. Conditions in OR
-    /// relation can coexist with each other.
-    bool setInRange(bool forOrRelation = false) {
-      if (forOrRelation) {
-        return true;
-      }
-      if (inRange_ || multiRange_ || leftBound_ || rightBound_ || isNull_) {
-        return false;
-      }
-      inRange_ = true;
-      return true;
-    }
-
-    /// Set the existence of left bound and returns whether it can coexist with
-    /// existing conditions for this field.
-    bool setLeftBound(bool forOrRelation = false) {
-      if (forOrRelation) {
-        if (!rightBound_)
-          leftBound_ = true;
-        return !rightBound_;
-      }
-      if (leftBound_ || inRange_ || multiRange_ || isNull_) {
-        return false;
-      }
-      leftBound_ = true;
-      return true;
-    }
-
-    /// Set the existence of right bound and returns whether it can coexist 
with
-    /// existing conditions for this field.
-    bool setRightBound(bool forOrRelation = false) {
-      if (forOrRelation) {
-        if (!leftBound_)
-          rightBound_ = true;
-        return !leftBound_;
-      }
-      if (rightBound_ || inRange_ || multiRange_ || isNull_) {
-        return false;
-      }
-      rightBound_ = true;
-      return true;
-    }
-
-    /// Set the existence of multi-range and returns whether it can coexist 
with
-    /// existing conditions for this field.
-    bool setMultiRange() {
-      if (inRange_ || multiRange_ || leftBound_ || rightBound_ || isNull_) {
-        return false;
-      }
-      multiRange_ = true;
-      return true;
-    }
-
-    /// Set the existence of IsNull and returns whether it can coexist with
-    /// existing conditions for this field.
-    bool setIsNull() {
-      if (inRange_ || multiRange_ || leftBound_ || rightBound_) {
-        return false;
-      }
-      isNull_ = true;
-      return true;
-    }
-
-    /// Set certain existence according to function name and returns whether it
-    /// can coexist with existing conditions for this field.
-    bool setCertainRangeForFunction(const std::string& functionName, bool 
reverse = false, bool forOrRelation = false);
-
-   private:
-    /// The existence of values range.
-    bool inRange_ = false;
-
-    /// The existence of left bound.
-    bool leftBound_ = false;
-
-    /// The existence of right bound.
-    bool rightBound_ = false;
-
-    /// The existence of multi-range.
-    bool multiRange_ = false;
-
-    /// The existence of IsNull.
-    bool isNull_ = false;
-  };
-
-  /// Filter info for a column used in filter push down.
-  class FilterInfo {
-   public:
-    // Null is not allowed.
-    void forbidsNull() {
-      nullAllowed_ = false;
-      if (!initialized_) {
-        initialized_ = true;
-      }
-      forbidsNullSet_ = true;
-    }
-
-    // Only null is allowed.
-    void setNull() {
-      isNull_ = true;
-      nullAllowed_ = true;
-      if (!initialized_) {
-        initialized_ = true;
-      }
-      isNullSet_ = true;
-    }
-
-    // Return the initialization status.
-    bool isInitialized() const {
-      return initialized_;
-    }
-
-    // Add a lower bound to the range. Multiple lower bounds are
-    // regarded to be in 'or' relation.
-    void setLower(const std::optional<variant>& left, bool isExclusive) {
-      lowerBounds_.emplace_back(left);
-      lowerExclusives_.emplace_back(isExclusive);
-      if (!initialized_) {
-        initialized_ = true;
-      }
-    }
-
-    // Add a upper bound to the range. Multiple upper bounds are
-    // regarded to be in 'or' relation.
-    void setUpper(const std::optional<variant>& right, bool isExclusive) {
-      upperBounds_.emplace_back(right);
-      upperExclusives_.emplace_back(isExclusive);
-      if (!initialized_) {
-        initialized_ = true;
-      }
-    }
-
-    // Set a list of values to be used in the push down of 'in' expression.
-    void setValues(const std::vector<variant>& values) {
-      for (const auto& value : values) {
-        values_.emplace_back(value);
-      }
-      if (!initialized_) {
-        initialized_ = true;
-      }
-    }
-
-    // Set a value for the not(equal) condition.
-    void setNotValue(const std::optional<variant>& notValue) {
-      notValue_ = notValue;
-      if (!initialized_) {
-        initialized_ = true;
-      }
-    }
-
-    // Set a list of values to be used in the push down of 'not in' expression.
-    void setNotValues(const std::vector<variant>& notValues) {
-      for (const auto& value : notValues) {
-        notValues_.emplace_back(value);
-      }
-      if (!initialized_) {
-        initialized_ = true;
-      }
-    }
-
-    // Whether this filter map is initialized.
-    bool initialized_ = false;
-
-    bool nullAllowed_ = false;
-    bool isNull_ = false;
-    bool forbidsNullSet_ = false;
-    bool isNullSet_ = false;
-
-    // If true, left bound will be exclusive.
-    std::vector<bool> lowerExclusives_;
-
-    // If true, right bound will be exclusive.
-    std::vector<bool> upperExclusives_;
-
-    // A value should not be equal to.
-    std::optional<variant> notValue_ = std::nullopt;
-
-    // The lower bounds in 'or' relation.
-    std::vector<std::optional<variant>> lowerBounds_;
-
-    // The upper bounds in 'or' relation.
-    std::vector<std::optional<variant>> upperBounds_;
-
-    // The list of values used in 'in' expression.
-    std::vector<variant> values_;
-
-    // The list of values should not be equal to.
-    std::vector<variant> notValues_;
-  };
-
   /// Returns unique ID to use for plan node. Produces sequential numbers
   /// starting from zero.
   std::string nextPlanNodeId();
 
-  /// Returns whether the args of a scalar function being field or
-  /// field with literal. If yes, extract and set the field index.
-  static bool fieldOrWithLiteral(
-      const 
::google::protobuf::RepeatedPtrField<::substrait::FunctionArgument>& arguments,
-      uint32_t& fieldIndex);
-
-  /// Separate the functions to be two parts:
-  /// subfield functions to be handled by the subfieldFilters in HiveConnector,
-  /// and remaining functions to be handled by the remainingFilter in
-  /// HiveConnector.
-  void separateFilters(
-      std::vector<RangeRecorder>& rangeRecorders,
-      const std::vector<::substrait::Expression_ScalarFunction>& 
scalarFunctions,
-      std::vector<::substrait::Expression_ScalarFunction>& subfieldFunctions,
-      std::vector<::substrait::Expression_ScalarFunction>& remainingFunctions,
-      const std::vector<::substrait::Expression_SingularOrList>& 
singularOrLists,
-      std::vector<::substrait::Expression_SingularOrList>& subfieldrOrLists,
-      std::vector<::substrait::Expression_SingularOrList>& remainingrOrLists,
-      const std::vector<TypePtr>& veloxTypeList,
-      const dwio::common::FileFormat& format);
-
-  /// Returns whether a function can be pushed down.
-  static bool canPushdownFunction(
-      const ::substrait::Expression_ScalarFunction& scalarFunction,
-      const std::string& filterName,
-      uint32_t& fieldIdx);
-
-  /// Returns whether a NOT function can be pushed down.
-  bool canPushdownNot(
-      const ::substrait::Expression_ScalarFunction& scalarFunction,
-      std::vector<RangeRecorder>& rangeRecorders);
-
-  /// Returns whether a OR function can be pushed down.
-  bool canPushdownOr(
-      const ::substrait::Expression_ScalarFunction& scalarFunction,
-      std::vector<RangeRecorder>& rangeRecorders);
-
-  /// Returns whether a SingularOrList can be pushed down.
-  static bool canPushdownSingularOrList(
-      const ::substrait::Expression_SingularOrList& singularOrList,
-      bool disableIntLike = false);
-
-  /// Check whether the children functions of this scalar function have the 
same
-  /// column index. Curretly used to check whether the two chilren functions of
-  /// 'or' expression are effective on the same column.
-  static bool childrenFunctionsOnSameField(const 
::substrait::Expression_ScalarFunction& function);
-
-  /// Extract the scalar function, and set the filter info for different types
-  /// of columns. If reverse is true, the opposite filter info will be set.
-  void setFilterInfo(
-      const ::substrait::Expression_ScalarFunction& scalarFunction,
-      const std::vector<TypePtr>& inputTypeList,
-      std::vector<FilterInfo>& columnToFilterInfo,
-      bool reverse = false);
-
-  /// Extract SingularOrList and set it to the filter info map.
-  /// If reverse is true, the opposite filter info will be set.
-  void setFilterInfo(
-      const ::substrait::Expression_SingularOrList& singularOrList,
-      std::vector<FilterInfo>& columnToFilterInfo,
-      bool reverse = false);
-
-  /// Extract SingularOrList and returns the field index.
-  static uint32_t getColumnIndexFromSingularOrList(const 
::substrait::Expression_SingularOrList&);
-
-  /// Set the filter info for a column base on the information
-  /// extracted from filter condition.
-  static void setColumnFilterInfo(
-      const std::string& filterName,
-      std::optional<variant> literalVariant,
-      FilterInfo& columnToFilterInfo,
-      bool reverse);
-
-  /// Create a multirange to specify the filter 'x != notValue' with:
-  /// x > notValue or x < notValue.
-  template <TypeKind KIND, typename FilterType>
-  void createNotEqualFilter(variant notVariant, bool nullAllowed, 
std::vector<std::unique_ptr<FilterType>>& colFilters);
-
-  /// Create a values range to handle (not) in filter.
-  /// variants: the list of values extracted from the (not) in expression.
-  //  negated: false for IN filter, true for NOT IN filter.
-  /// inputName: the column input name.
-  template <TypeKind KIND>
-  void setInFilter(
-      const std::vector<variant>& variants,
-      bool nullAllowed,
-      bool negated,
-      const std::string& inputName,
-      connector::hive::SubfieldFilters& filters);
-
-  /// Set the constructed filters into SubfieldFilters.
-  /// The FilterType is used to distinguish BigintRange and
-  /// Filter (the base class). This is needed because BigintMultiRange
-  /// can only accept the unique ptr of BigintRange as parameter.
-  template <TypeKind KIND, typename FilterType>
-  void setSubfieldFilter(
-      std::vector<std::unique_ptr<FilterType>> colFilters,
-      const std::string& inputName,
-      bool nullAllowed,
-      connector::hive::SubfieldFilters& filters);
-
-  /// Create the subfield filter based on the constructed filter info.
-  /// inputName: the input name of a column.
-  template <TypeKind KIND, typename FilterType>
-  void constructSubfieldFilters(
-      uint32_t colIdx,
-      const std::string& inputName,
-      const TypePtr& inputType,
-      const FilterInfo& filterInfo,
-      connector::hive::SubfieldFilters& filters);
-
-  /// Construct subfield filters according to the pre-set map of filter info.
-  connector::hive::SubfieldFilters mapToFilters(
-      const std::vector<std::string>& inputNameList,
-      const std::vector<TypePtr>& inputTypeList,
-      std::vector<FilterInfo>& columnToFilterInfo);
-
-  /// Convert subfield functions into subfieldFilters to
-  /// be used in Hive Connector.
-  connector::hive::SubfieldFilters createSubfieldFilters(
-      const std::vector<std::string>& inputNameList,
-      const std::vector<TypePtr>& inputTypeList,
-      const std::vector<::substrait::Expression_ScalarFunction>& 
subfieldFunctions,
-      const std::vector<::substrait::Expression_SingularOrList>& 
singularOrLists);
-
-  /// Connect all remaining functions with 'and' relation
-  /// for the use of remaingFilter in Hive Connector.
-  core::TypedExprPtr connectWithAnd(
-      std::vector<std::string> inputNameList,
-      std::vector<TypePtr> inputTypeList,
-      const std::vector<::substrait::Expression_ScalarFunction>& 
remainingFunctions,
-      const std::vector<::substrait::Expression_SingularOrList>& 
singularOrLists,
-      const std::vector<::substrait::Expression_IfThen>& ifThens);
-
-  /// Connect the left and right expressions with 'and' relation.
-  core::TypedExprPtr connectWithAnd(core::TypedExprPtr leftExpr, 
core::TypedExprPtr rightExpr);
-
   /// Used to convert AggregateRel into Velox plan node.
   /// The output of child node will be used as the input of Aggregation.
   std::shared_ptr<const core::PlanNode> toVeloxAgg(
diff --git a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc 
b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
index 06d4ea0195..3193f53bfb 100644
--- a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
+++ b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
@@ -256,12 +256,7 @@ TEST_F(Substrait2VeloxPlanConversionTest, ifthenTest) {
   // Convert to Velox PlanNode.
   auto planNode = planConverter_->toVeloxPlan(substraitPlan, 
std::vector<::substrait::ReadRel_LocalFiles>{split});
   ASSERT_EQ(
-      "-- Project[1][expressions: ] -> \n  "
-      "-- TableScan[0][table: hive_table, range filters: [(hd_demo_sk, 
Filter(IsNotNull, deterministic, null not allowed)),"
-      " (hd_vehicle_count, BigintRange: [1, 9223372036854775807] no nulls)], 
remaining filter: "
-      
"(and(or(equalto(\"hd_buy_potential\",\">10000\"),equalto(\"hd_buy_potential\",\"unknown\")),"
-      "if(greaterthan(\"hd_vehicle_count\",0),greaterthan(divide(cast 
\"hd_dep_count\" as DOUBLE,"
-      "cast \"hd_vehicle_count\" as DOUBLE),1.2))))] -> n0_0:BIGINT, 
n0_1:VARCHAR, n0_2:BIGINT, n0_3:BIGINT\n",
+      "-- Project[1][expressions: ] -> \n  -- TableScan[0][table: hive_table, 
remaining filter: 
(and(and(and(and(isnotnull(\"hd_vehicle_count\"),or(equalto(\"hd_buy_potential\",\">10000\"),equalto(\"hd_buy_potential\",\"unknown\"))),greaterthan(\"hd_vehicle_count\",0)),if(greaterthan(\"hd_vehicle_count\",0),greaterthan(divide(cast
 \"hd_dep_count\" as DOUBLE,cast \"hd_vehicle_count\" as 
DOUBLE),1.2))),isnotnull(\"hd_demo_sk\")))] -> n0_0:BIGINT, n0_1:VARCHAR, 
n0_2:BIGINT, n0_3:BIGINT\n",
       planNode->toString(true, true));
 }
 
@@ -277,8 +272,7 @@ TEST_F(Substrait2VeloxPlanConversionTest, filterUpper) {
   // Convert to Velox PlanNode.
   auto planNode = planConverter_->toVeloxPlan(substraitPlan, 
std::vector<::substrait::ReadRel_LocalFiles>{split});
   ASSERT_EQ(
-      "-- Project[1][expressions: ] -> \n  -- TableScan[0][table: hive_table, 
range filters: "
-      "[(key, BigintRange: [-2147483648, 2] no nulls)]] -> n0_0:INTEGER\n",
+      "-- Project[1][expressions: ] -> \n  -- TableScan[0][table: hive_table, 
remaining filter: (and(isnotnull(\"key\"),lessthan(\"key\",3)))] -> 
n0_0:INTEGER\n",
       planNode->toString(true, true));
 }
 } // namespace gluten
diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh
index d802838b4b..2be1481fc0 100755
--- a/ep/build-velox/src/get_velox.sh
+++ b/ep/build-velox/src/get_velox.sh
@@ -17,7 +17,7 @@
 set -exu
 
 VELOX_REPO=https://github.com/oap-project/velox.git
-VELOX_BRANCH=2024_11_13
+VELOX_BRANCH=2024_11_13_new
 VELOX_HOME=""
 
 OS=`uname -s`


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to