This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a03606675b [VL] Enable base64 and unbase64 functions (#9596)
a03606675b is described below
commit a03606675bcc42a280d466a29994c8b27527ea7f
Author: Zhen Li <[email protected]>
AuthorDate: Thu Jul 10 00:35:01 2025 +0800
[VL] Enable base64 and unbase64 functions (#9596)
This patch enable base64 and unbase64 functions. "failOnError = true" will
be supported in followup PR
---
.../apache/gluten/execution/VeloxStringFunctionsSuite.scala | 6 ++++++
cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc | 2 +-
.../org/apache/gluten/expression/ExpressionConverter.scala | 13 +++++++++++++
.../main/scala/org/apache/gluten/sql/shims/SparkShims.scala | 4 +++-
.../org/apache/gluten/sql/shims/spark34/Spark34Shims.scala | 1 +
.../org/apache/gluten/sql/shims/spark35/Spark35Shims.scala | 2 ++
6 files changed, 26 insertions(+), 2 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala
index ec3a6079fe..24fb04f4c7 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala
@@ -671,4 +671,10 @@ class VeloxStringFunctionsSuite extends
VeloxWholeStageTransformerSuite {
s"select l_orderkey, luhn_check(l_comment) " +
s"from $LINEITEM_TABLE limit
$LENGTH")(checkGlutenOperatorMatch[ProjectExecTransformer])
}
+
+ test("base64 and unbase64") {
+ runQueryAndCompare(
+ s"select l_orderkey, unbase64(base64(l_comment)) " +
+ s"from $LINEITEM_TABLE limit
$LENGTH")(checkGlutenOperatorMatch[ProjectExecTransformer])
+ }
}
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
index c5aa800b10..5a73132f2f 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
@@ -59,7 +59,7 @@ const std::unordered_set<std::string> kRegexFunctions = {
"rlike"};
const std::unordered_set<std::string> kBlackList =
- {"split_part", "sequence", "approx_percentile", "get_array_struct_fields",
"map_from_arrays", "base64", "unbase64"};
+ {"split_part", "sequence", "approx_percentile", "get_array_struct_fields",
"map_from_arrays"};
} // namespace
bool SubstraitToVeloxPlanValidator::parseVeloxType(
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
index 26dcf90124..90e8f168b6 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
@@ -172,6 +172,17 @@ object ExpressionConverter extends SQLConfHelper with
Logging {
ExpressionNames.LUHN_CHECK,
replaceWithExpressionTransformer0(i.arguments.head, attributeSeq,
expressionsMap),
i)
+ case i @ StaticInvoke(_, _, "encode" | "decode", Seq(_, _), _, _, _, _)
+ if i.objectName.endsWith("Base64") =>
+ if (!SQLConf.get.getConfString("spark.sql.chunkBase64String.enabled",
"true").toBoolean) {
+ throw new GlutenNotSupportException(
+ "Base64 with chunkBase64String disabled is not supported in
gluten.")
+ }
+ return GenericExpressionTransformer(
+ ExpressionNames.BASE64,
+ replaceWithExpressionTransformer0(i.arguments.head, attributeSeq,
expressionsMap),
+ i
+ )
case StaticInvoke(clz, _, functionName, _, _, _, _, _) =>
throw new GlutenNotSupportException(
s"Not supported to transform StaticInvoke with object:
${clz.getName}, " +
@@ -753,6 +764,8 @@ object ExpressionConverter extends SQLConfHelper with
Logging {
substraitExprName,
expr.children.map(replaceWithExpressionTransformer0(_, attributeSeq,
expressionsMap)),
j)
+ case u: UnBase64 if
SparkShimLoader.getSparkShims.unBase64FunctionFailsOnError(u) =>
+ throw new GlutenNotSupportException("UnBase64 with failOnError is not
supported in gluten.")
case ce if
BackendsApiManager.getSparkPlanExecApiInstance.expressionFlattenSupported(ce) =>
replaceFlattenedExpressionWithExpressionTransformer(
substraitExprName,
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 942c74e41c..7837b2e0d6 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.csv.CSVOptions
-import org.apache.spark.sql.catalyst.expressions.{Attribute, BinaryExpression,
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, BinaryExpression,
Expression, UnBase64}
import
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -309,4 +309,6 @@ trait SparkShims {
Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]]
def getCollectLimitOffset(plan: CollectLimitExec): Int = 0
+
+ def unBase64FunctionFailsOnError(unBase64: UnBase64): Boolean = false
}
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index 5e08764a04..bec95d4984 100644
---
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -622,4 +622,5 @@ class Spark34Shims extends SparkShims {
plan.offset
}
+ override def unBase64FunctionFailsOnError(unBase64: UnBase64): Boolean =
unBase64.failOnError
}
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index f953fd90fe..93a473ce40 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -682,4 +682,6 @@ class Spark35Shims extends SparkShims {
override def getCollectLimitOffset(plan: CollectLimitExec): Int = {
plan.offset
}
+
+ override def unBase64FunctionFailsOnError(unBase64: UnBase64): Boolean =
unBase64.failOnError
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]