This is an automated email from the ASF dual-hosted git repository.

zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ce0d97a0b2 [VL] Enable AtLeastNNonNulls function (#7326)
ce0d97a0b2 is described below

commit ce0d97a0b282c65ced6add5cc795d8b70cb41150
Author: Zhen Li <[email protected]>
AuthorDate: Thu Sep 26 10:26:51 2024 +0800

    [VL] Enable AtLeastNNonNulls function (#7326)
    
    [VL] Enable AtLeastNNonNulls function.
    Related velox pr: https://github.com/facebookincubator/velox/pull/10508
---
 .../org/apache/gluten/utils/CHExpressionUtil.scala  |  1 +
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala   | 10 ++++++++++
 .../execution/ScalarFunctionsValidateSuite.scala    | 21 +++++++++++++++++++++
 .../gluten/backendsapi/SparkPlanExecApi.scala       |  7 +++++++
 .../gluten/expression/ExpressionConverter.scala     |  6 ++++++
 .../gluten/expression/ExpressionMappings.scala      |  1 +
 .../apache/gluten/expression/ExpressionNames.scala  |  1 +
 7 files changed, 47 insertions(+)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala
index 645189310a..e58db43b26 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala
@@ -198,6 +198,7 @@ object CHExpressionUtil {
     DATE_FROM_UNIX_DATE -> DefaultValidator(),
     MONOTONICALLY_INCREASING_ID -> DefaultValidator(),
     SPARK_PARTITION_ID -> DefaultValidator(),
+    AT_LEAST_N_NON_NULLS -> DefaultValidator(),
     URL_DECODE -> DefaultValidator(),
     URL_ENCODE -> DefaultValidator(),
     FORMAT_STRING -> FormatStringValidator(),
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 64c212690f..d30caa1779 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -84,6 +84,16 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
       newExpr)
   }
 
+  override def genAtLeastNNonNullsTransformer(
+      substraitExprName: String,
+      children: Seq[ExpressionTransformer],
+      original: AtLeastNNonNulls): ExpressionTransformer = {
+    GenericExpressionTransformer(
+      substraitExprName,
+      Seq(LiteralTransformer(Literal(original.n))) ++ children,
+      original)
+  }
+
   /** Transform Uuid to Substrait. */
   override def genUuidTransformer(
       substraitExprName: String,
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
index 05ef86e5e6..f9f9fe8e0c 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
@@ -1417,4 +1417,25 @@ abstract class ScalarFunctionsValidateSuite extends 
FunctionsValidateSuite {
     // Scale < 0 should round down even on integral values
     compareResultsAgainstVanillaSpark("select round(44, -1)", true, { _ => })
   }
+
+  test("test internal function: AtLeastNNonNulls") {
+    // AtLeastNNonNulls is called by drop DataFrameNafunction
+    withTempPath {
+      path =>
+        val input = Seq[(String, java.lang.Integer, java.lang.Double)](
+          ("Bob", 16, 176.5),
+          ("Alice", null, 164.3),
+          ("David", 60, null),
+          ("Nina", 25, Double.NaN),
+          ("Amy", null, null),
+          (null, null, null)
+        ).toDF("name", "age", "height")
+        val rows = input.collect()
+        input.write.parquet(path.getCanonicalPath)
+
+        val df = spark.read.parquet(path.getCanonicalPath).na.drop(2, 
Seq("age", "height"))
+        checkAnswer(df, rows(0) :: Nil)
+        checkGlutenOperatorMatch[FilterExecTransformer](df)
+    }
+  }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index f95fd23891..667c0bdc25 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -178,6 +178,13 @@ trait SparkPlanExecApi {
     GenericExpressionTransformer(substraitExprName, Seq(left, right), original)
   }
 
+  def genAtLeastNNonNullsTransformer(
+      substraitExprName: String,
+      children: Seq[ExpressionTransformer],
+      original: AtLeastNNonNulls): ExpressionTransformer = {
+    throw new GlutenNotSupportException("AtLeastNNonNulls is not supported")
+  }
+
   def genUuidTransformer(substraitExprName: String, original: Uuid): 
ExpressionTransformer = {
     GenericExpressionTransformer(substraitExprName, Seq(), original)
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
index 541f29fe31..72586b0349 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
@@ -506,6 +506,12 @@ object ExpressionConverter extends SQLConfHelper with 
Logging {
           replaceWithExpressionTransformer0(n.right, attributeSeq, 
expressionsMap),
           n
         )
+      case a: AtLeastNNonNulls =>
+        
BackendsApiManager.getSparkPlanExecApiInstance.genAtLeastNNonNullsTransformer(
+          substraitExprName,
+          a.children.map(replaceWithExpressionTransformer0(_, attributeSeq, 
expressionsMap)),
+          a
+        )
       case m: MakeTimestamp =>
         
BackendsApiManager.getSparkPlanExecApiInstance.genMakeTimestampTransformer(
           substraitExprName,
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
index f3896cf954..176cf575c2 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
@@ -287,6 +287,7 @@ object ExpressionMappings {
     Sig[PromotePrecision](PROMOTE_PRECISION),
     Sig[MonotonicallyIncreasingID](MONOTONICALLY_INCREASING_ID),
     Sig[SparkPartitionID](SPARK_PARTITION_ID),
+    Sig[AtLeastNNonNulls](AT_LEAST_N_NON_NULLS),
     Sig[WidthBucket](WIDTH_BUCKET),
     Sig[ReplicateRows](REPLICATE_ROWS),
     Sig[RaiseError](RAISE_ERROR),
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
 
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
index a4f2e7174d..32ac1914e6 100644
--- 
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
+++ 
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
@@ -324,6 +324,7 @@ object ExpressionNames {
   final val REPLICATE_ROWS = "replicaterows"
   final val RAISE_ERROR = "raise_error"
   final val VERSION = "version"
+  final val AT_LEAST_N_NON_NULLS = "at_least_n_non_nulls"
 
   // Directly use child expression transformer
   final val KNOWN_NULLABLE = "known_nullable"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to