This is an automated email from the ASF dual-hosted git repository.
rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c4566ebc1 [GLUTEN-5965][VL] Support the pushdown "NOT IN" filter
(#5966)
c4566ebc1 is described below
commit c4566ebc1bb4264f14858b313d8565f4268ff3ff
Author: WangGuangxin <[email protected]>
AuthorDate: Fri Jun 14 08:55:38 2024 +0800
[GLUTEN-5965][VL] Support the pushdown "NOT IN" filter (#5966)
---
.../org/apache/gluten/execution/TestOperator.scala | 65 ++++++++++++
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 110 ++++++++++++++++-----
cpp/velox/substrait/SubstraitToVeloxPlan.h | 23 ++++-
3 files changed, 169 insertions(+), 29 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 3cf485aac..a892b6f31 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -157,6 +157,71 @@ class TestOperator extends VeloxWholeStageTransformerSuite
with AdaptiveSparkPla
checkLengthAndPlan(df, 60141)
}
+ test("not in") {
+ // integral type
+ val df = runQueryAndCompare(
+ "select l_orderkey from lineitem " +
+ "where l_partkey not in (1552, 674, 1062)") {
+ checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+ }
+ checkLengthAndPlan(df, 60053)
+
+ val df2 = runQueryAndCompare(
+ "select l_orderkey from lineitem " +
+ "where l_partkey not in (1552, 674) and l_partkey not in (1062)") {
+ checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+ }
+ checkLengthAndPlan(df2, 60053)
+
+ val df3 = runQueryAndCompare(
+ "select l_orderkey from lineitem " +
+ "where l_partkey not in (1552, 674) and l_partkey != 1062") {
+ checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+ }
+ checkLengthAndPlan(df3, 60053)
+
+ // string type
+ val df4 =
+ runQueryAndCompare("select o_orderstatus from orders where o_orderstatus
not in ('O', 'F')") {
+ checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+ }
+ checkLengthAndPlan(df4, 363)
+
+ // bool type
+ withTable("t") {
+ sql("create table t (id int, b boolean) using parquet")
+ sql("insert into t values (1, true), (2, false), (3, null)")
+ runQueryAndCompare("select * from t where b not in (true)") {
+ checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+ }
+
+ runQueryAndCompare("select * from t where b not in (true, false)") {
+ checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+ }
+ }
+
+ // mix not-in with range
+ runQueryAndCompare(
+ "select l_orderkey from lineitem " +
+ "where l_partkey not in (1552, 674) and l_partkey >= 1552") {
+ checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+ }
+
+ // mix not-in with in
+ runQueryAndCompare(
+ "select l_orderkey from lineitem " +
+ "where l_partkey not in (1552, 674) and l_partkey in (1552)") {
+ checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+ }
+
+ // not-in with or relation
+ runQueryAndCompare(
+ "select l_orderkey from lineitem " +
+ "where l_partkey not in (1552, 674) or l_partkey in (1552)") {
+ checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+ }
+ }
+
test("coalesce") {
var df = runQueryAndCompare(
"select l_orderkey, coalesce(l_comment, 'default_val') " +
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 4e875d479..8b8a92624 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -20,6 +20,7 @@
#include "VariantToVectorConverter.h"
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/exec/TableWriter.h"
+#include "velox/type/Filter.h"
#include "velox/type/Type.h"
#include "utils/ConfigExtractor.h"
@@ -1465,10 +1466,12 @@ connector::hive::SubfieldFilters
SubstraitToVeloxPlanConverter::createSubfieldFi
auto expr = scalarFunction.arguments()[0].value();
if (expr.has_scalar_function()) {
// Set its child to filter info with reverse enabled.
- setFilterInfo(scalarFunction.arguments()[0].value().scalar_function(),
inputTypeList, columnToFilterInfo, true);
+ setFilterInfo(expr.scalar_function(), inputTypeList,
columnToFilterInfo, true);
+ } else if (expr.has_singular_or_list()) {
+ auto singularOrList = expr.singular_or_list();
+ setFilterInfo(singularOrList, columnToFilterInfo, true);
} else {
- // TODO: support push down of Not In.
- VELOX_NYI("Scalar function expected.");
+ VELOX_NYI("Only support push down Not with scalar function or In.");
}
} else if (filterName == sOr) {
VELOX_CHECK(scalarFunction.arguments().size() == 2);
@@ -1593,24 +1596,26 @@ bool SubstraitToVeloxPlanConverter::canPushdownNot(
std::vector<RangeRecorder>& rangeRecorders) {
VELOX_CHECK(scalarFunction.arguments().size() == 1, "Only one arg is
expected for Not.");
const auto& notArg = scalarFunction.arguments()[0];
- if (!notArg.value().has_scalar_function()) {
- // Not for a Boolean Literal or Or List is not supported curretly.
- // It can be pushed down with an AlwaysTrue or AlwaysFalse Range.
- return false;
- }
-
- auto argFunction =
- SubstraitParser::findFunctionSpec(functionMap_,
notArg.value().scalar_function().function_reference());
- auto functionName = SubstraitParser::getNameBeforeDelimiter(argFunction);
+ if (notArg.value().has_singular_or_list()) {
+ auto singularOrList = notArg.value().singular_or_list();
+ if (!canPushdownSingularOrList(singularOrList)) {
+ return false;
+ }
+ uint32_t colIdx = getColumnIndexFromSingularOrList(singularOrList);
+ return rangeRecorders.at(colIdx).setInRange();
+ } else if (notArg.value().has_scalar_function()) {
+ auto argFunction =
+ SubstraitParser::findFunctionSpec(functionMap_,
notArg.value().scalar_function().function_reference());
+ auto functionName = SubstraitParser::getNameBeforeDelimiter(argFunction);
- static const std::unordered_set<std::string> supportedNotFunctions = {sGte,
sGt, sLte, sLt, sEqual};
+ static const std::unordered_set<std::string> supportedNotFunctions =
{sGte, sGt, sLte, sLt, sEqual};
- uint32_t fieldIdx;
- bool isFieldOrWithLiteral =
fieldOrWithLiteral(notArg.value().scalar_function().arguments(), fieldIdx);
+ uint32_t fieldIdx;
+ bool isFieldOrWithLiteral =
fieldOrWithLiteral(notArg.value().scalar_function().arguments(), fieldIdx);
- if (supportedNotFunctions.find(functionName) != supportedNotFunctions.end()
&& isFieldOrWithLiteral &&
- rangeRecorders.at(fieldIdx).setCertainRangeForFunction(functionName,
true /*reverse*/)) {
- return true;
+ return (
+ supportedNotFunctions.find(functionName) !=
supportedNotFunctions.end() && isFieldOrWithLiteral &&
+ rangeRecorders.at(fieldIdx).setCertainRangeForFunction(functionName,
true /*reverse*/));
}
return false;
}
@@ -1966,6 +1971,7 @@ template <TypeKind KIND>
void SubstraitToVeloxPlanConverter::setInFilter(
const std::vector<variant>& variants,
bool nullAllowed,
+ bool negated,
const std::string& inputName,
connector::hive::SubfieldFilters& filters) {}
@@ -1973,6 +1979,7 @@ template <>
void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::BIGINT>(
const std::vector<variant>& variants,
bool nullAllowed,
+ bool negated,
const std::string& inputName,
connector::hive::SubfieldFilters& filters) {
std::vector<int64_t> values;
@@ -1981,13 +1988,18 @@ void
SubstraitToVeloxPlanConverter::setInFilter<TypeKind::BIGINT>(
int64_t value = variant.value<int64_t>();
values.emplace_back(value);
}
- filters[common::Subfield(inputName)] = common::createBigintValues(values,
nullAllowed);
+ if (negated) {
+ filters[common::Subfield(inputName)] =
common::createNegatedBigintValues(values, nullAllowed);
+ } else {
+ filters[common::Subfield(inputName)] = common::createBigintValues(values,
nullAllowed);
+ }
}
template <>
void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::INTEGER>(
const std::vector<variant>& variants,
bool nullAllowed,
+ bool negated,
const std::string& inputName,
connector::hive::SubfieldFilters& filters) {
// Use bigint values for int type.
@@ -1998,13 +2010,18 @@ void
SubstraitToVeloxPlanConverter::setInFilter<TypeKind::INTEGER>(
int64_t value = variant.value<int32_t>();
values.emplace_back(value);
}
- filters[common::Subfield(inputName)] = common::createBigintValues(values,
nullAllowed);
+ if (negated) {
+ filters[common::Subfield(inputName)] =
common::createNegatedBigintValues(values, nullAllowed);
+ } else {
+ filters[common::Subfield(inputName)] = common::createBigintValues(values,
nullAllowed);
+ }
}
template <>
void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::SMALLINT>(
const std::vector<variant>& variants,
bool nullAllowed,
+ bool negated,
const std::string& inputName,
connector::hive::SubfieldFilters& filters) {
// Use bigint values for small int type.
@@ -2015,13 +2032,18 @@ void
SubstraitToVeloxPlanConverter::setInFilter<TypeKind::SMALLINT>(
int64_t value = variant.value<int16_t>();
values.emplace_back(value);
}
- filters[common::Subfield(inputName)] = common::createBigintValues(values,
nullAllowed);
+ if (negated) {
+ filters[common::Subfield(inputName)] =
common::createNegatedBigintValues(values, nullAllowed);
+ } else {
+ filters[common::Subfield(inputName)] = common::createBigintValues(values,
nullAllowed);
+ }
}
template <>
void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::TINYINT>(
const std::vector<variant>& variants,
bool nullAllowed,
+ bool negated,
const std::string& inputName,
connector::hive::SubfieldFilters& filters) {
// Use bigint values for tiny int type.
@@ -2032,13 +2054,18 @@ void
SubstraitToVeloxPlanConverter::setInFilter<TypeKind::TINYINT>(
int64_t value = variant.value<int8_t>();
values.emplace_back(value);
}
- filters[common::Subfield(inputName)] = common::createBigintValues(values,
nullAllowed);
+ if (negated) {
+ filters[common::Subfield(inputName)] =
common::createNegatedBigintValues(values, nullAllowed);
+ } else {
+ filters[common::Subfield(inputName)] = common::createBigintValues(values,
nullAllowed);
+ }
}
template <>
void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::VARCHAR>(
const std::vector<variant>& variants,
bool nullAllowed,
+ bool negated,
const std::string& inputName,
connector::hive::SubfieldFilters& filters) {
std::vector<std::string> values;
@@ -2047,7 +2074,11 @@ void
SubstraitToVeloxPlanConverter::setInFilter<TypeKind::VARCHAR>(
std::string value = variant.value<std::string>();
values.emplace_back(value);
}
- filters[common::Subfield(inputName)] =
std::make_unique<common::BytesValues>(values, nullAllowed);
+ if (negated) {
+ filters[common::Subfield(inputName)] =
std::make_unique<common::NegatedBytesValues>(values, nullAllowed);
+ } else {
+ filters[common::Subfield(inputName)] =
std::make_unique<common::BytesValues>(values, nullAllowed);
+ }
}
template <TypeKind KIND, typename FilterType>
@@ -2102,6 +2133,17 @@ void
SubstraitToVeloxPlanConverter::constructSubfieldFilters(
if (filterInfo.notValue_) {
filters[common::Subfield(inputName)] =
std::make_unique<common::BoolValue>(!filterInfo.notValue_.value().value<bool>(),
nullAllowed);
+ } else if (filterInfo.notValues_.size() > 0) {
+ std::set<bool> notValues;
+ for (auto v : filterInfo.notValues_) {
+ notValues.emplace(v.value<bool>());
+ }
+ if (notValues.size() == 1) {
+ filters[common::Subfield(inputName)] =
std::make_unique<common::BoolValue>(!(*notValues.begin()), nullAllowed);
+ } else {
+ // if there are more than one distinct value in NOT IN list, the
filter should be AlwaysFalse
+ filters[common::Subfield(inputName)] =
std::make_unique<common::AlwaysFalse>();
+ }
} else if (rangeSize == 0) {
// IsNull/IsNotNull.
if (!nullAllowed) {
@@ -2140,11 +2182,22 @@ void
SubstraitToVeloxPlanConverter::constructSubfieldFilters(
if (filterInfo.values_.size() > 0) {
// To filter out null is a default behaviour of Spark IN expression.
nullAllowed = false;
- setInFilter<KIND>(filterInfo.values_, nullAllowed, inputName, filters);
+ setInFilter<KIND>(filterInfo.values_, nullAllowed, false, inputName,
filters);
// Currently, In cannot coexist with other filter conditions
// due to multirange is in 'OR' relation but 'AND' is needed.
VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot
be supported after IN filter.");
VELOX_CHECK(!filterInfo.notValue_.has_value(), "Not equal cannot be
supported after IN filter.");
+ VELOX_CHECK(filterInfo.notValues_.size() == 0, "Not in cannot be
supported after IN filter.");
+ return;
+ }
+
+ // Handle not in filter.
+ if (filterInfo.notValues_.size() > 0) {
+ setInFilter<KIND>(filterInfo.notValues_, filterInfo.nullAllowed_, true,
inputName, filters);
+ // Currently, NOT In cannot coexist with other filter conditions
+ // due to multirange is in 'OR' relation but 'AND' is needed.
+ VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot
be supported after NOT IN filter.");
+ VELOX_CHECK(!filterInfo.notValue_.has_value(), "Not equal cannot be
supported after NOT IN filter.");
return;
}
@@ -2429,7 +2482,8 @@ uint32_t
SubstraitToVeloxPlanConverter::getColumnIndexFromSingularOrList(
void SubstraitToVeloxPlanConverter::setFilterInfo(
const ::substrait::Expression_SingularOrList& singularOrList,
- std::vector<FilterInfo>& columnToFilterInfo) {
+ std::vector<FilterInfo>& columnToFilterInfo,
+ bool reverse) {
VELOX_CHECK(singularOrList.options_size() > 0, "At least one option is
expected.");
// Get the column index.
uint32_t colIdx = getColumnIndexFromSingularOrList(singularOrList);
@@ -2443,7 +2497,11 @@ void SubstraitToVeloxPlanConverter::setFilterInfo(
variants.emplace_back(exprConverter_->toVeloxExpr(option.literal())->value());
}
// Set the value list to filter info.
- columnToFilterInfo[colIdx].setValues(variants);
+ if (!reverse) {
+ columnToFilterInfo[colIdx].setValues(variants);
+ } else {
+ columnToFilterInfo[colIdx].setNotValues(variants);
+ }
}
} // namespace gluten
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 3a0e677af..1535b1f85 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -377,6 +377,16 @@ class SubstraitToVeloxPlanConverter {
}
}
+ // Set a list of values to be used in the push down of 'not in' expression.
+ void setNotValues(const std::vector<variant>& notValues) {
+ for (const auto& value : notValues) {
+ notValues_.emplace_back(value);
+ }
+ if (!initialized_) {
+ initialized_ = true;
+ }
+ }
+
// Whether this filter map is initialized.
bool initialized_ = false;
@@ -402,6 +412,9 @@ class SubstraitToVeloxPlanConverter {
// The list of values used in 'in' expression.
std::vector<variant> values_;
+
+ // The list of values should not be equal to.
+ std::vector<variant> notValues_;
};
/// Returns unique ID to use for plan node. Produces sequential numbers
@@ -464,9 +477,11 @@ class SubstraitToVeloxPlanConverter {
bool reverse = false);
/// Extract SingularOrList and set it to the filter info map.
+ /// If reverse is true, the opposite filter info will be set.
void setFilterInfo(
const ::substrait::Expression_SingularOrList& singularOrList,
- std::vector<FilterInfo>& columnToFilterInfo);
+ std::vector<FilterInfo>& columnToFilterInfo,
+ bool reverse = false);
/// Extract SingularOrList and returns the field index.
static uint32_t getColumnIndexFromSingularOrList(const
::substrait::Expression_SingularOrList&);
@@ -484,13 +499,15 @@ class SubstraitToVeloxPlanConverter {
template <TypeKind KIND, typename FilterType>
void createNotEqualFilter(variant notVariant, bool nullAllowed,
std::vector<std::unique_ptr<FilterType>>& colFilters);
- /// Create a values range to handle in filter.
- /// variants: the list of values extracted from the in expression.
+ /// Create a values range to handle (not) in filter.
+ /// variants: the list of values extracted from the (not) in expression.
+ // negated: false for IN filter, true for NOT IN filter.
/// inputName: the column input name.
template <TypeKind KIND>
void setInFilter(
const std::vector<variant>& variants,
bool nullAllowed,
+ bool negated,
const std::string& inputName,
connector::hive::SubfieldFilters& filters);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]