This is an automated email from the ASF dual-hosted git repository.

rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c4566ebc1 [GLUTEN-5965][VL] Support the pushdown "NOT IN" filter 
(#5966)
c4566ebc1 is described below

commit c4566ebc1bb4264f14858b313d8565f4268ff3ff
Author: WangGuangxin <[email protected]>
AuthorDate: Fri Jun 14 08:55:38 2024 +0800

    [GLUTEN-5965][VL] Support the pushdown "NOT IN" filter (#5966)
---
 .../org/apache/gluten/execution/TestOperator.scala |  65 ++++++++++++
 cpp/velox/substrait/SubstraitToVeloxPlan.cc        | 110 ++++++++++++++++-----
 cpp/velox/substrait/SubstraitToVeloxPlan.h         |  23 ++++-
 3 files changed, 169 insertions(+), 29 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 3cf485aac..a892b6f31 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -157,6 +157,71 @@ class TestOperator extends VeloxWholeStageTransformerSuite 
with AdaptiveSparkPla
     checkLengthAndPlan(df, 60141)
   }
 
+  test("not in") {
+    // integral type
+    val df = runQueryAndCompare(
+      "select l_orderkey from lineitem " +
+        "where l_partkey not in (1552, 674, 1062)") {
+      checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+    }
+    checkLengthAndPlan(df, 60053)
+
+    val df2 = runQueryAndCompare(
+      "select l_orderkey from lineitem " +
+        "where l_partkey not in (1552, 674) and l_partkey not in (1062)") {
+      checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+    }
+    checkLengthAndPlan(df2, 60053)
+
+    val df3 = runQueryAndCompare(
+      "select l_orderkey from lineitem " +
+        "where l_partkey not in (1552, 674) and l_partkey != 1062") {
+      checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+    }
+    checkLengthAndPlan(df3, 60053)
+
+    // string type
+    val df4 =
+      runQueryAndCompare("select o_orderstatus from orders where o_orderstatus 
not in ('O', 'F')") {
+        checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+      }
+    checkLengthAndPlan(df4, 363)
+
+    // bool type
+    withTable("t") {
+      sql("create table t (id int, b boolean) using parquet")
+      sql("insert into t values (1, true), (2, false), (3, null)")
+      runQueryAndCompare("select * from t where b not in (true)") {
+        checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+      }
+
+      runQueryAndCompare("select * from t where b not in (true, false)") {
+        checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+      }
+    }
+
+    // mix not-in with range
+    runQueryAndCompare(
+      "select l_orderkey from lineitem " +
+        "where l_partkey not in (1552, 674) and l_partkey >= 1552") {
+      checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+    }
+
+    // mix not-in with in
+    runQueryAndCompare(
+      "select l_orderkey from lineitem " +
+        "where l_partkey not in (1552, 674) and l_partkey in (1552)") {
+      checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+    }
+
+    // not-in with or relation
+    runQueryAndCompare(
+      "select l_orderkey from lineitem " +
+        "where l_partkey not in (1552, 674) or l_partkey in (1552)") {
+      checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+    }
+  }
+
   test("coalesce") {
     var df = runQueryAndCompare(
       "select l_orderkey, coalesce(l_comment, 'default_val') " +
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 4e875d479..8b8a92624 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -20,6 +20,7 @@
 #include "VariantToVectorConverter.h"
 #include "velox/connectors/hive/HiveDataSink.h"
 #include "velox/exec/TableWriter.h"
+#include "velox/type/Filter.h"
 #include "velox/type/Type.h"
 
 #include "utils/ConfigExtractor.h"
@@ -1465,10 +1466,12 @@ connector::hive::SubfieldFilters 
SubstraitToVeloxPlanConverter::createSubfieldFi
       auto expr = scalarFunction.arguments()[0].value();
       if (expr.has_scalar_function()) {
         // Set its child to filter info with reverse enabled.
-        setFilterInfo(scalarFunction.arguments()[0].value().scalar_function(), 
inputTypeList, columnToFilterInfo, true);
+        setFilterInfo(expr.scalar_function(), inputTypeList, 
columnToFilterInfo, true);
+      } else if (expr.has_singular_or_list()) {
+        auto singularOrList = expr.singular_or_list();
+        setFilterInfo(singularOrList, columnToFilterInfo, true);
       } else {
-        // TODO: support push down of Not In.
-        VELOX_NYI("Scalar function expected.");
+        VELOX_NYI("Only support push down Not with scalar function or In.");
       }
     } else if (filterName == sOr) {
       VELOX_CHECK(scalarFunction.arguments().size() == 2);
@@ -1593,24 +1596,26 @@ bool SubstraitToVeloxPlanConverter::canPushdownNot(
     std::vector<RangeRecorder>& rangeRecorders) {
   VELOX_CHECK(scalarFunction.arguments().size() == 1, "Only one arg is 
expected for Not.");
   const auto& notArg = scalarFunction.arguments()[0];
-  if (!notArg.value().has_scalar_function()) {
-    // Not for a Boolean Literal or Or List is not supported curretly.
-    // It can be pushed down with an AlwaysTrue or AlwaysFalse Range.
-    return false;
-  }
-
-  auto argFunction =
-      SubstraitParser::findFunctionSpec(functionMap_, 
notArg.value().scalar_function().function_reference());
-  auto functionName = SubstraitParser::getNameBeforeDelimiter(argFunction);
+  if (notArg.value().has_singular_or_list()) {
+    auto singularOrList = notArg.value().singular_or_list();
+    if (!canPushdownSingularOrList(singularOrList)) {
+      return false;
+    }
+    uint32_t colIdx = getColumnIndexFromSingularOrList(singularOrList);
+    return rangeRecorders.at(colIdx).setInRange();
+  } else if (notArg.value().has_scalar_function()) {
+    auto argFunction =
+        SubstraitParser::findFunctionSpec(functionMap_, 
notArg.value().scalar_function().function_reference());
+    auto functionName = SubstraitParser::getNameBeforeDelimiter(argFunction);
 
-  static const std::unordered_set<std::string> supportedNotFunctions = {sGte, 
sGt, sLte, sLt, sEqual};
+    static const std::unordered_set<std::string> supportedNotFunctions = 
{sGte, sGt, sLte, sLt, sEqual};
 
-  uint32_t fieldIdx;
-  bool isFieldOrWithLiteral = 
fieldOrWithLiteral(notArg.value().scalar_function().arguments(), fieldIdx);
+    uint32_t fieldIdx;
+    bool isFieldOrWithLiteral = 
fieldOrWithLiteral(notArg.value().scalar_function().arguments(), fieldIdx);
 
-  if (supportedNotFunctions.find(functionName) != supportedNotFunctions.end() 
&& isFieldOrWithLiteral &&
-      rangeRecorders.at(fieldIdx).setCertainRangeForFunction(functionName, 
true /*reverse*/)) {
-    return true;
+    return (
+        supportedNotFunctions.find(functionName) != 
supportedNotFunctions.end() && isFieldOrWithLiteral &&
+        rangeRecorders.at(fieldIdx).setCertainRangeForFunction(functionName, 
true /*reverse*/));
   }
   return false;
 }
@@ -1966,6 +1971,7 @@ template <TypeKind KIND>
 void SubstraitToVeloxPlanConverter::setInFilter(
     const std::vector<variant>& variants,
     bool nullAllowed,
+    bool negated,
     const std::string& inputName,
     connector::hive::SubfieldFilters& filters) {}
 
@@ -1973,6 +1979,7 @@ template <>
 void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::BIGINT>(
     const std::vector<variant>& variants,
     bool nullAllowed,
+    bool negated,
     const std::string& inputName,
     connector::hive::SubfieldFilters& filters) {
   std::vector<int64_t> values;
@@ -1981,13 +1988,18 @@ void 
SubstraitToVeloxPlanConverter::setInFilter<TypeKind::BIGINT>(
     int64_t value = variant.value<int64_t>();
     values.emplace_back(value);
   }
-  filters[common::Subfield(inputName)] = common::createBigintValues(values, 
nullAllowed);
+  if (negated) {
+    filters[common::Subfield(inputName)] = 
common::createNegatedBigintValues(values, nullAllowed);
+  } else {
+    filters[common::Subfield(inputName)] = common::createBigintValues(values, 
nullAllowed);
+  }
 }
 
 template <>
 void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::INTEGER>(
     const std::vector<variant>& variants,
     bool nullAllowed,
+    bool negated,
     const std::string& inputName,
     connector::hive::SubfieldFilters& filters) {
   // Use bigint values for int type.
@@ -1998,13 +2010,18 @@ void 
SubstraitToVeloxPlanConverter::setInFilter<TypeKind::INTEGER>(
     int64_t value = variant.value<int32_t>();
     values.emplace_back(value);
   }
-  filters[common::Subfield(inputName)] = common::createBigintValues(values, 
nullAllowed);
+  if (negated) {
+    filters[common::Subfield(inputName)] = 
common::createNegatedBigintValues(values, nullAllowed);
+  } else {
+    filters[common::Subfield(inputName)] = common::createBigintValues(values, 
nullAllowed);
+  }
 }
 
 template <>
 void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::SMALLINT>(
     const std::vector<variant>& variants,
     bool nullAllowed,
+    bool negated,
     const std::string& inputName,
     connector::hive::SubfieldFilters& filters) {
   // Use bigint values for small int type.
@@ -2015,13 +2032,18 @@ void 
SubstraitToVeloxPlanConverter::setInFilter<TypeKind::SMALLINT>(
     int64_t value = variant.value<int16_t>();
     values.emplace_back(value);
   }
-  filters[common::Subfield(inputName)] = common::createBigintValues(values, 
nullAllowed);
+  if (negated) {
+    filters[common::Subfield(inputName)] = 
common::createNegatedBigintValues(values, nullAllowed);
+  } else {
+    filters[common::Subfield(inputName)] = common::createBigintValues(values, 
nullAllowed);
+  }
 }
 
 template <>
 void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::TINYINT>(
     const std::vector<variant>& variants,
     bool nullAllowed,
+    bool negated,
     const std::string& inputName,
     connector::hive::SubfieldFilters& filters) {
   // Use bigint values for tiny int type.
@@ -2032,13 +2054,18 @@ void 
SubstraitToVeloxPlanConverter::setInFilter<TypeKind::TINYINT>(
     int64_t value = variant.value<int8_t>();
     values.emplace_back(value);
   }
-  filters[common::Subfield(inputName)] = common::createBigintValues(values, 
nullAllowed);
+  if (negated) {
+    filters[common::Subfield(inputName)] = 
common::createNegatedBigintValues(values, nullAllowed);
+  } else {
+    filters[common::Subfield(inputName)] = common::createBigintValues(values, 
nullAllowed);
+  }
 }
 
 template <>
 void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::VARCHAR>(
     const std::vector<variant>& variants,
     bool nullAllowed,
+    bool negated,
     const std::string& inputName,
     connector::hive::SubfieldFilters& filters) {
   std::vector<std::string> values;
@@ -2047,7 +2074,11 @@ void 
SubstraitToVeloxPlanConverter::setInFilter<TypeKind::VARCHAR>(
     std::string value = variant.value<std::string>();
     values.emplace_back(value);
   }
-  filters[common::Subfield(inputName)] = 
std::make_unique<common::BytesValues>(values, nullAllowed);
+  if (negated) {
+    filters[common::Subfield(inputName)] = 
std::make_unique<common::NegatedBytesValues>(values, nullAllowed);
+  } else {
+    filters[common::Subfield(inputName)] = 
std::make_unique<common::BytesValues>(values, nullAllowed);
+  }
 }
 
 template <TypeKind KIND, typename FilterType>
@@ -2102,6 +2133,17 @@ void 
SubstraitToVeloxPlanConverter::constructSubfieldFilters(
     if (filterInfo.notValue_) {
       filters[common::Subfield(inputName)] =
           
std::make_unique<common::BoolValue>(!filterInfo.notValue_.value().value<bool>(),
 nullAllowed);
+    } else if (filterInfo.notValues_.size() > 0) {
+      std::set<bool> notValues;
+      for (auto v : filterInfo.notValues_) {
+        notValues.emplace(v.value<bool>());
+      }
+      if (notValues.size() == 1) {
+        filters[common::Subfield(inputName)] = 
std::make_unique<common::BoolValue>(!(*notValues.begin()), nullAllowed);
+      } else {
+        // if there are more than one distinct value in NOT IN list, the 
filter should be AlwaysFalse
+        filters[common::Subfield(inputName)] = 
std::make_unique<common::AlwaysFalse>();
+      }
     } else if (rangeSize == 0) {
       // IsNull/IsNotNull.
       if (!nullAllowed) {
@@ -2140,11 +2182,22 @@ void 
SubstraitToVeloxPlanConverter::constructSubfieldFilters(
     if (filterInfo.values_.size() > 0) {
       // To filter out null is a default behaviour of Spark IN expression.
       nullAllowed = false;
-      setInFilter<KIND>(filterInfo.values_, nullAllowed, inputName, filters);
+      setInFilter<KIND>(filterInfo.values_, nullAllowed, false, inputName, 
filters);
       // Currently, In cannot coexist with other filter conditions
       // due to multirange is in 'OR' relation but 'AND' is needed.
       VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot 
be supported after IN filter.");
       VELOX_CHECK(!filterInfo.notValue_.has_value(), "Not equal cannot be 
supported after IN filter.");
+      VELOX_CHECK(filterInfo.notValues_.size() == 0, "Not in cannot be 
supported after IN filter.");
+      return;
+    }
+
+    // Handle not in filter.
+    if (filterInfo.notValues_.size() > 0) {
+      setInFilter<KIND>(filterInfo.notValues_, filterInfo.nullAllowed_, true, 
inputName, filters);
+      // Currently, NOT In cannot coexist with other filter conditions
+      // due to multirange is in 'OR' relation but 'AND' is needed.
+      VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot 
be supported after NOT IN filter.");
+      VELOX_CHECK(!filterInfo.notValue_.has_value(), "Not equal cannot be 
supported after NOT IN filter.");
       return;
     }
 
@@ -2429,7 +2482,8 @@ uint32_t 
SubstraitToVeloxPlanConverter::getColumnIndexFromSingularOrList(
 
 void SubstraitToVeloxPlanConverter::setFilterInfo(
     const ::substrait::Expression_SingularOrList& singularOrList,
-    std::vector<FilterInfo>& columnToFilterInfo) {
+    std::vector<FilterInfo>& columnToFilterInfo,
+    bool reverse) {
   VELOX_CHECK(singularOrList.options_size() > 0, "At least one option is 
expected.");
   // Get the column index.
   uint32_t colIdx = getColumnIndexFromSingularOrList(singularOrList);
@@ -2443,7 +2497,11 @@ void SubstraitToVeloxPlanConverter::setFilterInfo(
     
variants.emplace_back(exprConverter_->toVeloxExpr(option.literal())->value());
   }
   // Set the value list to filter info.
-  columnToFilterInfo[colIdx].setValues(variants);
+  if (!reverse) {
+    columnToFilterInfo[colIdx].setValues(variants);
+  } else {
+    columnToFilterInfo[colIdx].setNotValues(variants);
+  }
 }
 
 } // namespace gluten
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h 
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 3a0e677af..1535b1f85 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -377,6 +377,16 @@ class SubstraitToVeloxPlanConverter {
       }
     }
 
+    // Set a list of values to be used in the push down of 'not in' expression.
+    void setNotValues(const std::vector<variant>& notValues) {
+      for (const auto& value : notValues) {
+        notValues_.emplace_back(value);
+      }
+      if (!initialized_) {
+        initialized_ = true;
+      }
+    }
+
     // Whether this filter map is initialized.
     bool initialized_ = false;
 
@@ -402,6 +412,9 @@ class SubstraitToVeloxPlanConverter {
 
     // The list of values used in 'in' expression.
     std::vector<variant> values_;
+
+    // The list of values should not be equal to.
+    std::vector<variant> notValues_;
   };
 
   /// Returns unique ID to use for plan node. Produces sequential numbers
@@ -464,9 +477,11 @@ class SubstraitToVeloxPlanConverter {
       bool reverse = false);
 
   /// Extract SingularOrList and set it to the filter info map.
+  /// If reverse is true, the opposite filter info will be set.
   void setFilterInfo(
       const ::substrait::Expression_SingularOrList& singularOrList,
-      std::vector<FilterInfo>& columnToFilterInfo);
+      std::vector<FilterInfo>& columnToFilterInfo,
+      bool reverse = false);
 
   /// Extract SingularOrList and returns the field index.
   static uint32_t getColumnIndexFromSingularOrList(const 
::substrait::Expression_SingularOrList&);
@@ -484,13 +499,15 @@ class SubstraitToVeloxPlanConverter {
   template <TypeKind KIND, typename FilterType>
   void createNotEqualFilter(variant notVariant, bool nullAllowed, 
std::vector<std::unique_ptr<FilterType>>& colFilters);
 
-  /// Create a values range to handle in filter.
-  /// variants: the list of values extracted from the in expression.
+  /// Create a values range to handle (not) in filter.
+  /// variants: the list of values extracted from the (not) in expression.
+  //  negated: false for IN filter, true for NOT IN filter.
   /// inputName: the column input name.
   template <TypeKind KIND>
   void setInFilter(
       const std::vector<variant>& variants,
       bool nullAllowed,
+      bool negated,
       const std::string& inputName,
       connector::hive::SubfieldFilters& filters);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to