This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new e62083022 [GLUTEN-7015][VL] Remove udf native registration (#7016)
e62083022 is described below
commit e62083022033ee9c6ee0cf716d864f433a52a9a3
Author: Rong Ma <[email protected]>
AuthorDate: Wed Sep 4 21:39:52 2024 +0800
[GLUTEN-7015][VL] Remove udf native registration (#7016)
---
.../java/org/apache/gluten/udf/UdfJniWrapper.java | 2 +-
.../backendsapi/velox/VeloxListenerApi.scala | 2 +
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 2 -
.../execution/HashAggregateExecTransformer.scala | 17 +-
.../apache/spark/sql/expression/UDFResolver.scala | 36 +---
.../apache/gluten/expression/VeloxUdfSuite.scala | 105 +++++-----
cpp/velox/jni/JniUdf.cc | 2 +-
cpp/velox/jni/JniUdf.h | 2 +-
cpp/velox/jni/VeloxJniWrapper.cc | 4 +-
.../substrait/SubstraitToVeloxPlanValidator.cc | 4 +-
cpp/velox/tests/MyUdfTest.cc | 21 +-
cpp/velox/udf/examples/CMakeLists.txt | 3 -
cpp/velox/udf/examples/MyUDAF.cc | 226 ++-------------------
cpp/velox/udf/examples/MyUDF.cc | 222 --------------------
cpp/velox/udf/examples/TestMyUDF.cc | 45 ----
docs/developers/VeloxUDF.md | 39 ++--
.../apache/spark/sql/hive/HiveUDAFInspector.scala | 14 +-
.../expression/AggregateFunctionsBuilder.scala | 2 +-
18 files changed, 137 insertions(+), 611 deletions(-)
diff --git
a/backends-velox/src/main/java/org/apache/gluten/udf/UdfJniWrapper.java
b/backends-velox/src/main/java/org/apache/gluten/udf/UdfJniWrapper.java
index 8bfe8bad5..bbe2057c4 100644
--- a/backends-velox/src/main/java/org/apache/gluten/udf/UdfJniWrapper.java
+++ b/backends-velox/src/main/java/org/apache/gluten/udf/UdfJniWrapper.java
@@ -18,5 +18,5 @@ package org.apache.gluten.udf;
public class UdfJniWrapper {
- public static native void getFunctionSignatures();
+ public static native void registerFunctionSignatures();
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index c7bfa9ab5..bb5e7deec 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -21,6 +21,7 @@ import org.apache.gluten.backendsapi.ListenerApi
import org.apache.gluten.execution.datasource.{GlutenOrcWriterInjects,
GlutenParquetWriterInjects, GlutenRowSplitter}
import org.apache.gluten.expression.UDFMappings
import org.apache.gluten.init.NativeBackendInitializer
+import org.apache.gluten.udf.UdfJniWrapper
import org.apache.gluten.utils._
import org.apache.gluten.vectorized.{JniLibLoader, JniWorkspace}
@@ -81,6 +82,7 @@ class VeloxListenerApi extends ListenerApi with Logging {
SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = true)
initialize(conf)
+ UdfJniWrapper.registerFunctionSignatures()
}
override def onDriverShutdown(): Unit = shutdown()
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 438895b25..4ff7f0305 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -30,7 +30,6 @@ import
org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasI
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
GlutenFallbackReporter}
-import org.apache.spark.sql.expression.UDFResolver
class VeloxRuleApi extends RuleApi {
import VeloxRuleApi._
@@ -47,7 +46,6 @@ private object VeloxRuleApi {
// Regular Spark rules.
injector.injectOptimizerRule(CollectRewriteRule.apply)
injector.injectOptimizerRule(HLLRewriteRule.apply)
- UDFResolver.getFunctionSignatures().foreach(injector.injectFunction)
injector.injectPostHocResolutionRule(ArrowConvertorRule.apply)
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
index 9c5b68e7b..fe5e0d92d 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
@@ -30,6 +30,8 @@ import org.apache.gluten.utils.VeloxIntermediateData
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.expression.UDFResolver
+import org.apache.spark.sql.hive.HiveUDAFInspector
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -681,14 +683,25 @@ object VeloxAggregateFunctionsBuilder {
aggregateFunc: AggregateFunction,
mode: AggregateMode): Long = {
val functionMap = args.asInstanceOf[JHashMap[String, JLong]]
- val sigName =
AggregateFunctionsBuilder.getSubstraitFunctionName(aggregateFunc)
+ val (sigName, aggFunc) =
+ try {
+ (AggregateFunctionsBuilder.getSubstraitFunctionName(aggregateFunc),
aggregateFunc)
+ } catch {
+ case e: GlutenNotSupportException =>
+ HiveUDAFInspector.getUDAFClassName(aggregateFunc) match {
+ case Some(udafClass) if UDFResolver.UDAFNames.contains(udafClass)
=>
+ (udafClass,
UDFResolver.getUdafExpression(udafClass)(aggregateFunc.children))
+ case _ => throw e
+ }
+ case e: Throwable => throw e
+ }
ExpressionBuilder.newScalarFunction(
functionMap,
ConverterUtils.makeFuncName(
// Substrait-to-Velox procedure will choose appropriate companion
function if needed.
sigName,
- VeloxIntermediateData.getInputTypes(aggregateFunc, mode ==
PartialMerge || mode == Final),
+ VeloxIntermediateData.getInputTypes(aggFunc, mode == PartialMerge ||
mode == Final),
FunctionConfig.REQ
)
)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
index 39032e46f..db166a513 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
@@ -18,16 +18,14 @@ package org.apache.spark.sql.expression
import org.apache.gluten.backendsapi.velox.VeloxBackendSettings
import org.apache.gluten.exception.{GlutenException, GlutenNotSupportException}
-import org.apache.gluten.expression.{ConverterUtils, ExpressionTransformer,
ExpressionType, GenericExpressionTransformer, Transformable}
-import org.apache.gluten.udf.UdfJniWrapper
+import org.apache.gluten.expression._
import org.apache.gluten.vectorized.JniWorkspace
-import org.apache.spark.{SparkConf, SparkContext, SparkFiles}
+import org.apache.spark.{SparkConf, SparkFiles}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow}
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast,
Expression, ExpressionInfo, Unevaluable}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast,
Expression, Unevaluable}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
ExprCode}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
@@ -334,32 +332,6 @@ object UDFResolver extends Logging {
.mkString(",")
}
- def getFunctionSignatures(): Seq[(FunctionIdentifier, ExpressionInfo,
FunctionBuilder)] = {
- val sparkContext = SparkContext.getActive.get
- val sparkConf = sparkContext.conf
- val udfLibPaths =
sparkConf.getOption(VeloxBackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)
-
- udfLibPaths match {
- case None =>
- Seq.empty
- case Some(_) =>
- UdfJniWrapper.getFunctionSignatures()
- UDFNames.map {
- name =>
- (
- new FunctionIdentifier(name),
- new ExpressionInfo(classOf[UDFExpression].getName, name),
- (e: Seq[Expression]) => getUdfExpression(name, name)(e))
- }.toSeq ++ UDAFNames.map {
- name =>
- (
- new FunctionIdentifier(name),
- new
ExpressionInfo(classOf[UserDefinedAggregateFunction].getName, name),
- (e: Seq[Expression]) => getUdafExpression(name)(e))
- }.toSeq
- }
- }
-
private def checkAllowTypeConversion: Boolean = {
SQLConf.get
.getConfString(VeloxBackendSettings.GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION,
"false")
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala
index 596757df3..f85103deb 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala
@@ -16,7 +16,6 @@
*/
package org.apache.gluten.expression
-import org.apache.gluten.backendsapi.velox.VeloxBackendSettings
import org.apache.gluten.tags.{SkipTestTags, UDFTest}
import org.apache.spark.SparkConf
@@ -25,7 +24,6 @@ import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.expression.UDFResolver
import java.nio.file.Paths
-import java.sql.Date
abstract class VeloxUdfSuite extends GlutenQueryTest with SQLHelper {
@@ -92,64 +90,55 @@ abstract class VeloxUdfSuite extends GlutenQueryTest with
SQLHelper {
.set("spark.memory.offHeap.size", "1024MB")
}
- test("test udf") {
- val df = spark.sql("""select
- | myudf1(100L),
- | myudf2(1),
- | myudf2(1L),
- | myudf3(),
- | myudf3(1),
- | myudf3(1, 2, 3),
- | myudf3(1L),
- | myudf3(1L, 2L, 3L),
- | mydate(cast('2024-03-25' as date), 5)
- |""".stripMargin)
- assert(
- df.collect()
- .sameElements(Array(Row(105L, 6, 6L, 5, 6, 11, 6L, 11L,
Date.valueOf("2024-03-30")))))
- }
-
- test("test udf allow type conversion") {
- withSQLConf(VeloxBackendSettings.GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION ->
"true") {
- val df = spark.sql("""select myudf1("100"), myudf1(1),
mydate('2024-03-25', 5)""")
- assert(
- df.collect()
- .sameElements(Array(Row(105L, 6L, Date.valueOf("2024-03-30")))))
- }
-
- withSQLConf(VeloxBackendSettings.GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION ->
"false") {
- assert(
- spark
- .sql("select mydate2('2024-03-25', 5)")
- .collect()
- .sameElements(Array(Row(Date.valueOf("2024-03-30")))))
- }
- }
-
- test("test udaf") {
- val df = spark.sql("""select
- | myavg(1),
- | myavg(1L),
- | myavg(cast(1.0 as float)),
- | myavg(cast(1.0 as double)),
- | mycount_if(true)
- |""".stripMargin)
- df.collect()
- assert(
- df.collect()
- .sameElements(Array(Row(1.0, 1.0, 1.0, 1.0, 1L))))
- }
+ // Aggregate result can be flaky.
+ ignore("test native hive udaf") {
+ val tbl = "test_hive_udaf_replacement"
+ withTempPath {
+ dir =>
+ try {
+ // Check native hive udaf has been registered.
+ val udafClass = "test.org.apache.spark.sql.MyDoubleAvg"
+ assert(UDFResolver.UDAFNames.contains(udafClass))
- test("test udaf allow type conversion") {
- withSQLConf(VeloxBackendSettings.GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION ->
"true") {
- val df = spark.sql("""select myavg("1"), myavg("1.0"),
mycount_if("true")""")
- assert(
- df.collect()
- .sameElements(Array(Row(1.0, 1.0, 1L))))
+ spark.sql(s"""
+ |CREATE TEMPORARY FUNCTION my_double_avg
+ |AS '$udafClass'
+ |""".stripMargin)
+ spark.sql(s"""
+ |CREATE EXTERNAL TABLE $tbl
+ |LOCATION 'file://$dir'
+ |AS select * from values (1, '1'), (2, '2'), (3, '3')
+ |""".stripMargin)
+ val df = spark.sql(s"""select
+ | my_double_avg(cast(col1 as double)),
+ | my_double_avg(cast(col2 as double))
+ | from $tbl
+ |""".stripMargin)
+ val nativeImplicitConversionDF = spark.sql(s"""select
+ | my_double_avg(col1),
+ | my_double_avg(col2)
+ | from $tbl
+ |""".stripMargin)
+ val nativeResult = df.collect()
+ val nativeImplicitConversionResult =
nativeImplicitConversionDF.collect()
+
+ UDFResolver.UDAFNames.remove(udafClass)
+ val fallbackDF = spark.sql(s"""select
+ | my_double_avg(cast(col1 as double)),
+ | my_double_avg(cast(col2 as double))
+ | from $tbl
+ |""".stripMargin)
+ val fallbackResult = fallbackDF.collect()
+ assert(nativeResult.sameElements(fallbackResult))
+ assert(nativeImplicitConversionResult.sameElements(fallbackResult))
+ } finally {
+ spark.sql(s"DROP TABLE IF EXISTS $tbl")
+ spark.sql(s"DROP TEMPORARY FUNCTION IF EXISTS my_double_avg")
+ }
}
}
- test("test hive udf replacement") {
+ test("test native hive udf") {
val tbl = "test_hive_udf_replacement"
withTempPath {
dir =>
@@ -169,12 +158,15 @@ abstract class VeloxUdfSuite extends GlutenQueryTest with
SQLHelper {
|AS 'org.apache.spark.sql.hive.execution.UDFStringString'
|""".stripMargin)
+ val nativeResultWithImplicitConversion =
+ spark.sql(s"""SELECT hive_string_string(col1, 'a') FROM
$tbl""").collect()
val nativeResult =
spark.sql(s"""SELECT hive_string_string(col2, 'a') FROM
$tbl""").collect()
// Unregister native hive udf to fallback.
UDFResolver.UDFNames.remove("org.apache.spark.sql.hive.execution.UDFStringString")
val fallbackResult =
spark.sql(s"""SELECT hive_string_string(col2, 'a') FROM
$tbl""").collect()
+
assert(nativeResultWithImplicitConversion.sameElements(fallbackResult))
assert(nativeResult.sameElements(fallbackResult))
// Add an unimplemented udf to the map to test fallback of
registered native hive udf.
@@ -237,6 +229,7 @@ class VeloxUdfSuiteLocal extends VeloxUdfSuite {
super.sparkConf
.set("spark.files", udfLibPath)
.set("spark.gluten.sql.columnar.backend.velox.udfLibraryPaths",
udfLibRelativePath)
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
}
}
diff --git a/cpp/velox/jni/JniUdf.cc b/cpp/velox/jni/JniUdf.cc
index 8230724f1..aa5d1c093 100644
--- a/cpp/velox/jni/JniUdf.cc
+++ b/cpp/velox/jni/JniUdf.cc
@@ -49,7 +49,7 @@ void gluten::finalizeVeloxJniUDF(JNIEnv* env) {
env->DeleteGlobalRef(udfResolverClass);
}
-void gluten::jniGetFunctionSignatures(JNIEnv* env) {
+void gluten::jniRegisterFunctionSignatures(JNIEnv* env) {
auto udfLoader = gluten::UdfLoader::getInstance();
const auto& signatures = udfLoader->getRegisteredUdfSignatures();
diff --git a/cpp/velox/jni/JniUdf.h b/cpp/velox/jni/JniUdf.h
index b91ac08de..568439d18 100644
--- a/cpp/velox/jni/JniUdf.h
+++ b/cpp/velox/jni/JniUdf.h
@@ -27,6 +27,6 @@ void initVeloxJniUDF(JNIEnv* env);
void finalizeVeloxJniUDF(JNIEnv* env);
-void jniGetFunctionSignatures(JNIEnv* env);
+void jniRegisterFunctionSignatures(JNIEnv* env);
} // namespace gluten
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 5df3a478e..37c90643a 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -88,11 +88,11 @@ JNIEXPORT void JNICALL
Java_org_apache_gluten_init_NativeBackendInitializer_shut
JNI_METHOD_END()
}
-JNIEXPORT void JNICALL
Java_org_apache_gluten_udf_UdfJniWrapper_getFunctionSignatures( // NOLINT
+JNIEXPORT void JNICALL
Java_org_apache_gluten_udf_UdfJniWrapper_registerFunctionSignatures( // NOLINT
JNIEnv* env,
jclass) {
JNI_METHOD_START
- gluten::jniGetFunctionSignatures(env);
+ gluten::jniRegisterFunctionSignatures(env);
JNI_METHOD_END()
}
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
index 7bb0eab77..60a8d38d1 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
@@ -1162,11 +1162,11 @@ bool SubstraitToVeloxPlanValidator::validate(const
::substrait::AggregateRel& ag
"regr_sxy",
"regr_replacement"};
- auto udfFuncs = UdfLoader::getInstance()->getRegisteredUdafNames();
+ auto udafFuncs = UdfLoader::getInstance()->getRegisteredUdafNames();
for (const auto& funcSpec : funcSpecs) {
auto funcName = SubstraitParser::getNameBeforeDelimiter(funcSpec);
- if (supportedAggFuncs.find(funcName) == supportedAggFuncs.end() &&
udfFuncs.find(funcName) == udfFuncs.end()) {
+ if (supportedAggFuncs.find(funcName) == supportedAggFuncs.end() &&
udafFuncs.find(funcName) == udafFuncs.end()) {
LOG_VALIDATION_MSG(funcName + " was not supported in AggregateRel.");
return false;
}
diff --git a/cpp/velox/tests/MyUdfTest.cc b/cpp/velox/tests/MyUdfTest.cc
index 46898b38c..c9849d67d 100644
--- a/cpp/velox/tests/MyUdfTest.cc
+++ b/cpp/velox/tests/MyUdfTest.cc
@@ -34,12 +34,17 @@ class MyUdfTest : public FunctionBaseTest {
}
};
-TEST_F(MyUdfTest, myudf1) {
- const auto myudf1 = [&](const int64_t& number) {
- return evaluateOnce<int64_t>("myudf1(c0)", BIGINT(),
std::make_optional(number));
- };
-
- EXPECT_EQ(5, myudf1(0));
- EXPECT_EQ(105, myudf1(100));
- EXPECT_EQ(3147483652, myudf1(3147483647)); // int64
+TEST_F(MyUdfTest, hivestringstring) {
+ auto map = facebook::velox::exec::vectorFunctionFactories();
+ const std::string candidate =
{"org.apache.spark.sql.hive.execution.UDFStringString"};
+ ASSERT(map.withRLock([&candidate](auto& self) -> bool {
+ auto iter = self.find(candidate);
+ std::unordered_map<std::string, std::string> values;
+ const facebook::velox::core::QueryConfig config(std::move(values));
+ iter->second.factory(
+ candidate,
+ {facebook::velox::exec::VectorFunctionArg{facebook::velox::VARCHAR()},
+ facebook::velox::exec::VectorFunctionArg{facebook::velox::VARCHAR()}},
+ config) != nullptr;
+ });)
}
\ No newline at end of file
diff --git a/cpp/velox/udf/examples/CMakeLists.txt
b/cpp/velox/udf/examples/CMakeLists.txt
index 3c51859ee..32f39425b 100644
--- a/cpp/velox/udf/examples/CMakeLists.txt
+++ b/cpp/velox/udf/examples/CMakeLists.txt
@@ -18,6 +18,3 @@ target_link_libraries(myudf velox)
add_library(myudaf SHARED "MyUDAF.cc")
target_link_libraries(myudaf velox)
-
-add_executable(test_myudf "TestMyUDF.cc")
-target_link_libraries(test_myudf velox)
diff --git a/cpp/velox/udf/examples/MyUDAF.cc b/cpp/velox/udf/examples/MyUDAF.cc
index 710bce53a..516404b55 100644
--- a/cpp/velox/udf/examples/MyUDAF.cc
+++ b/cpp/velox/udf/examples/MyUDAF.cc
@@ -90,7 +90,7 @@ class AverageAggregate {
}
bool writeFinalResult(exec::out_type<OutputType>& out) {
- out = sum_ / count_;
+ out = sum_ / count_ + 100.0;
return true;
}
@@ -103,12 +103,12 @@ class AverageAggregate {
class MyAvgRegisterer final : public gluten::UdafRegisterer {
int getNumUdaf() override {
- return 4;
+ return 2;
}
void populateUdafEntries(int& index, gluten::UdafEntry* udafEntries)
override {
- for (const auto& argTypes : {myAvgArg1_, myAvgArg2_, myAvgArg3_,
myAvgArg4_}) {
- udafEntries[index++] = {name_.c_str(), kDouble, 1, argTypes,
myAvgIntermediateType_};
+ for (const auto& argTypes : {myAvgArgFloat_, myAvgArgDouble_}) {
+ udafEntries[index++] = {name_.c_str(), kDouble, 1, argTypes,
myAvgIntermediateType_, false, true};
}
}
@@ -120,13 +120,11 @@ class MyAvgRegisterer final : public
gluten::UdafRegisterer {
exec::AggregateRegistrationResult registerSimpleAverageAggregate() {
std::vector<std::shared_ptr<exec::AggregateFunctionSignature>> signatures;
- for (const auto& inputType : {"smallint", "integer", "bigint", "double"}) {
- signatures.push_back(exec::AggregateFunctionSignatureBuilder()
- .returnType("double")
- .intermediateType("row(double,bigint)")
- .argumentType(inputType)
- .build());
- }
+ signatures.push_back(exec::AggregateFunctionSignatureBuilder()
+ .returnType("double")
+ .intermediateType("row(double,bigint)")
+ .argumentType("double")
+ .build());
signatures.push_back(exec::AggregateFunctionSignatureBuilder()
.returnType("real")
@@ -146,12 +144,6 @@ class MyAvgRegisterer final : public
gluten::UdafRegisterer {
auto inputType = argTypes[0];
if (exec::isRawInput(step)) {
switch (inputType->kind()) {
- case TypeKind::SMALLINT:
- return
std::make_unique<SimpleAggregateAdapter<AverageAggregate<int16_t>>>(resultType);
- case TypeKind::INTEGER:
- return
std::make_unique<SimpleAggregateAdapter<AverageAggregate<int32_t>>>(resultType);
- case TypeKind::BIGINT:
- return
std::make_unique<SimpleAggregateAdapter<AverageAggregate<int64_t>>>(resultType);
case TypeKind::REAL:
return
std::make_unique<SimpleAggregateAdapter<AverageAggregate<float>>>(resultType);
case TypeKind::DOUBLE:
@@ -175,207 +167,14 @@ class MyAvgRegisterer final : public
gluten::UdafRegisterer {
true /*overwrite*/);
}
- const std::string name_ = "myavg";
- const char* myAvgArg1_[1] = {kInteger};
- const char* myAvgArg2_[1] = {kBigInt};
- const char* myAvgArg3_[1] = {kFloat};
- const char* myAvgArg4_[1] = {kDouble};
+ const std::string name_ = "test.org.apache.spark.sql.MyDoubleAvg";
+ const char* myAvgArgFloat_[1] = {kFloat};
+ const char* myAvgArgDouble_[1] = {kDouble};
const char* myAvgIntermediateType_ = "struct<a:double,b:bigint>";
};
} // namespace myavg
-namespace mycountif {
-
-// Copied from velox/functions/prestosql/aggregates/CountIfAggregate.cpp
-class CountIfAggregate : public exec::Aggregate {
- public:
- explicit CountIfAggregate() : exec::Aggregate(BIGINT()) {}
-
- int32_t accumulatorFixedWidthSize() const override {
- return sizeof(int64_t);
- }
-
- void extractAccumulators(char** groups, int32_t numGroups, VectorPtr*
result) override {
- extractValues(groups, numGroups, result);
- }
-
- void extractValues(char** groups, int32_t numGroups, VectorPtr* result)
override {
- auto* vector = (*result)->as<FlatVector<int64_t>>();
- VELOX_CHECK(vector);
- vector->resize(numGroups);
-
- auto* rawValues = vector->mutableRawValues();
- for (vector_size_t i = 0; i < numGroups; ++i) {
- rawValues[i] = *value<int64_t>(groups[i]);
- }
- }
-
- void addRawInput(
- char** groups,
- const SelectivityVector& rows,
- const std::vector<VectorPtr>& args,
- bool /*mayPushdown*/) override {
- DecodedVector decoded(*args[0], rows);
-
- if (decoded.isConstantMapping()) {
- if (decoded.isNullAt(0)) {
- return;
- }
- if (decoded.valueAt<bool>(0)) {
- rows.applyToSelected([&](vector_size_t i) { addToGroup(groups[i], 1);
});
- }
- } else if (decoded.mayHaveNulls()) {
- rows.applyToSelected([&](vector_size_t i) {
- if (decoded.isNullAt(i)) {
- return;
- }
- if (decoded.valueAt<bool>(i)) {
- addToGroup(groups[i], 1);
- }
- });
- } else {
- rows.applyToSelected([&](vector_size_t i) {
- if (decoded.valueAt<bool>(i)) {
- addToGroup(groups[i], 1);
- }
- });
- }
- }
-
- void addIntermediateResults(
- char** groups,
- const SelectivityVector& rows,
- const std::vector<VectorPtr>& args,
- bool /*mayPushdown*/) override {
- DecodedVector decoded(*args[0], rows);
-
- if (decoded.isConstantMapping()) {
- auto numTrue = decoded.valueAt<int64_t>(0);
- rows.applyToSelected([&](vector_size_t i) { addToGroup(groups[i],
numTrue); });
- return;
- }
-
- rows.applyToSelected([&](vector_size_t i) {
- auto numTrue = decoded.valueAt<int64_t>(i);
- addToGroup(groups[i], numTrue);
- });
- }
-
- void addSingleGroupRawInput(
- char* group,
- const SelectivityVector& rows,
- const std::vector<VectorPtr>& args,
- bool /*mayPushdown*/) override {
- DecodedVector decoded(*args[0], rows);
-
- // Constant mapping - check once and add number of selected rows if true.
- if (decoded.isConstantMapping()) {
- if (!decoded.isNullAt(0)) {
- auto isTrue = decoded.valueAt<bool>(0);
- if (isTrue) {
- addToGroup(group, rows.countSelected());
- }
- }
- return;
- }
-
- int64_t numTrue = 0;
- if (decoded.mayHaveNulls()) {
- rows.applyToSelected([&](vector_size_t i) {
- if (decoded.isNullAt(i)) {
- return;
- }
- if (decoded.valueAt<bool>(i)) {
- ++numTrue;
- }
- });
- } else {
- rows.applyToSelected([&](vector_size_t i) {
- if (decoded.valueAt<bool>(i)) {
- ++numTrue;
- }
- });
- }
- addToGroup(group, numTrue);
- }
-
- void addSingleGroupIntermediateResults(
- char* group,
- const SelectivityVector& rows,
- const std::vector<VectorPtr>& args,
- bool /*mayPushdown*/) override {
- auto arg = args[0]->as<SimpleVector<int64_t>>();
-
- int64_t numTrue = 0;
- rows.applyToSelected([&](auto row) { numTrue += arg->valueAt(row); });
-
- addToGroup(group, numTrue);
- }
-
- protected:
- void initializeNewGroupsInternal(char** groups, folly::Range<const
vector_size_t*> indices) override {
- for (auto i : indices) {
- *value<int64_t>(groups[i]) = 0;
- }
- }
-
- private:
- inline void addToGroup(char* group, int64_t numTrue) {
- *value<int64_t>(group) += numTrue;
- }
-};
-
-class MyCountIfRegisterer final : public gluten::UdafRegisterer {
- int getNumUdaf() override {
- return 1;
- }
-
- void populateUdafEntries(int& index, gluten::UdafEntry* udafEntries)
override {
- udafEntries[index++] = {name_.c_str(), kBigInt, 1, myCountIfArg_, kBigInt};
- }
-
- void registerSignatures() override {
- registerCountIfAggregate();
- }
-
- private:
- void registerCountIfAggregate() {
- std::vector<std::shared_ptr<exec::AggregateFunctionSignature>> signatures{
- exec::AggregateFunctionSignatureBuilder()
- .returnType("bigint")
- .intermediateType("bigint")
- .argumentType("boolean")
- .build(),
- };
-
- exec::registerAggregateFunction(
- name_,
- std::move(signatures),
- [this](
- core::AggregationNode::Step step,
- std::vector<TypePtr> argTypes,
- const TypePtr& /*resultType*/,
- const core::QueryConfig& /*config*/) ->
std::unique_ptr<exec::Aggregate> {
- VELOX_CHECK_EQ(argTypes.size(), 1, "{} takes one argument", name_);
-
- auto isPartial = exec::isRawInput(step);
- if (isPartial) {
- VELOX_CHECK_EQ(argTypes[0]->kind(), TypeKind::BOOLEAN, "{}
function only accepts boolean parameter", name_);
- }
-
- return std::make_unique<CountIfAggregate>();
- },
- {false /*orderSensitive*/},
- true,
- true);
- }
-
- const std::string name_ = "mycount_if";
- const char* myCountIfArg_[1] = {kBoolean};
-};
-} // namespace mycountif
-
std::vector<std::shared_ptr<gluten::UdafRegisterer>>& globalRegisters() {
static std::vector<std::shared_ptr<gluten::UdafRegisterer>> registerers;
return registerers;
@@ -388,7 +187,6 @@ void setupRegisterers() {
}
auto& registerers = globalRegisters();
registerers.push_back(std::make_shared<myavg::MyAvgRegisterer>());
- registerers.push_back(std::make_shared<mycountif::MyCountIfRegisterer>());
inited = true;
}
} // namespace
diff --git a/cpp/velox/udf/examples/MyUDF.cc b/cpp/velox/udf/examples/MyUDF.cc
index 75e68413a..783699614 100644
--- a/cpp/velox/udf/examples/MyUDF.cc
+++ b/cpp/velox/udf/examples/MyUDF.cc
@@ -32,223 +32,6 @@ static const char* kBigInt = "bigint";
static const char* kDate = "date";
static const char* kVarChar = "varchar";
-namespace myudf {
-
-template <TypeKind Kind>
-class PlusFiveFunction : public exec::VectorFunction {
- public:
- explicit PlusFiveFunction() {}
-
- void apply(
- const SelectivityVector& rows,
- std::vector<VectorPtr>& args,
- const TypePtr& /* outputType */,
- exec::EvalCtx& context,
- VectorPtr& result) const override {
- using nativeType = typename TypeTraits<Kind>::NativeType;
-
- BaseVector::ensureWritable(rows, createScalarType<Kind>(), context.pool(),
result);
-
- auto* flatResult = result->asFlatVector<nativeType>();
- auto* rawResult = flatResult->mutableRawValues();
-
- flatResult->clearNulls(rows);
-
- rows.applyToSelected([&](auto row) { rawResult[row] = 5; });
-
- if (args.size() == 0) {
- return;
- }
-
- for (int i = 0; i < args.size(); i++) {
- auto& arg = args[i];
- VELOX_CHECK(arg->isFlatEncoding() || arg->isConstantEncoding());
- if (arg->isConstantEncoding()) {
- auto value = arg->as<ConstantVector<nativeType>>()->valueAt(0);
- rows.applyToSelected([&](auto row) { rawResult[row] += value; });
- } else {
- auto* rawInput = arg->as<FlatVector<nativeType>>()->rawValues();
- rows.applyToSelected([&](auto row) { rawResult[row] += rawInput[row];
});
- }
- }
- }
-};
-
-static std::shared_ptr<facebook::velox::exec::VectorFunction> makePlusConstant(
- const std::string& /*name*/,
- const std::vector<exec::VectorFunctionArg>& inputArgs,
- const core::QueryConfig& /*config*/) {
- if (inputArgs.size() == 0) {
- return std::make_shared<PlusFiveFunction<TypeKind::INTEGER>>();
- }
- auto typeKind = inputArgs[0].type->kind();
- switch (typeKind) {
- case TypeKind::INTEGER:
- return std::make_shared<PlusFiveFunction<TypeKind::INTEGER>>();
- case TypeKind::BIGINT:
- return std::make_shared<PlusFiveFunction<TypeKind::BIGINT>>();
- default:
- VELOX_UNREACHABLE();
- }
-}
-
-// name: myudf1
-// signatures:
-// bigint -> bigint
-// type: VectorFunction
-class MyUdf1Registerer final : public gluten::UdfRegisterer {
- public:
- int getNumUdf() override {
- return 1;
- }
-
- void populateUdfEntries(int& index, gluten::UdfEntry* udfEntries) override {
- udfEntries[index++] = {name_.c_str(), kBigInt, 1, bigintArg_};
- }
-
- void registerSignatures() override {
- facebook::velox::exec::registerVectorFunction(
- name_, bigintSignatures(),
std::make_unique<PlusFiveFunction<facebook::velox::TypeKind::BIGINT>>());
- }
-
- private:
- std::vector<std::shared_ptr<exec::FunctionSignature>> bigintSignatures() {
- return
{exec::FunctionSignatureBuilder().returnType("bigint").argumentType("bigint").build()};
- }
-
- const std::string name_ = "myudf1";
- const char* bigintArg_[1] = {kBigInt};
-};
-
-// name: myudf2
-// signatures:
-// integer -> integer
-// bigint -> bigint
-// type: StatefulVectorFunction
-class MyUdf2Registerer final : public gluten::UdfRegisterer {
- public:
- int getNumUdf() override {
- return 2;
- }
-
- void populateUdfEntries(int& index, gluten::UdfEntry* udfEntries) override {
- udfEntries[index++] = {name_.c_str(), kInteger, 1, integerArg_};
- udfEntries[index++] = {name_.c_str(), kBigInt, 1, bigintArg_};
- }
-
- void registerSignatures() override {
- facebook::velox::exec::registerStatefulVectorFunction(name_,
integerAndBigintSignatures(), makePlusConstant);
- }
-
- private:
- std::vector<std::shared_ptr<exec::FunctionSignature>>
integerAndBigintSignatures() {
- return {
-
exec::FunctionSignatureBuilder().returnType("integer").argumentType("integer").build(),
-
exec::FunctionSignatureBuilder().returnType("bigint").argumentType("bigint").build()};
- }
-
- const std::string name_ = "myudf2";
- const char* integerArg_[1] = {kInteger};
- const char* bigintArg_[1] = {kBigInt};
-};
-
-// name: myudf3
-// signatures:
-// [integer,] ... -> integer
-// bigint, [bigint,] ... -> bigint
-// type: StatefulVectorFunction with variable arity
-class MyUdf3Registerer final : public gluten::UdfRegisterer {
- public:
- int getNumUdf() override {
- return 2;
- }
-
- void populateUdfEntries(int& index, gluten::UdfEntry* udfEntries) override {
- udfEntries[index++] = {name_.c_str(), kInteger, 1, integerArg_, true};
- udfEntries[index++] = {name_.c_str(), kBigInt, 2, bigintArgs_, true};
- }
-
- void registerSignatures() override {
- facebook::velox::exec::registerStatefulVectorFunction(
- name_, integerAndBigintSignaturesWithVariableArity(),
makePlusConstant);
- }
-
- private:
- std::vector<std::shared_ptr<exec::FunctionSignature>>
integerAndBigintSignaturesWithVariableArity() {
- return {
-
exec::FunctionSignatureBuilder().returnType("integer").argumentType("integer").variableArity().build(),
- exec::FunctionSignatureBuilder()
- .returnType("bigint")
- .argumentType("bigint")
- .argumentType("bigint")
- .variableArity()
- .build()};
- }
-
- const std::string name_ = "myudf3";
- const char* integerArg_[1] = {kInteger};
- const char* bigintArgs_[2] = {kBigInt, kBigInt};
-};
-} // namespace myudf
-
-namespace mydate {
-template <typename T>
-struct MyDateSimpleFunction {
- VELOX_DEFINE_FUNCTION_TYPES(T);
-
- FOLLY_ALWAYS_INLINE void call(int32_t& result, const arg_type<Date>& date,
const arg_type<int32_t> addition) {
- result = date + addition;
- }
-};
-
-// name: mydate
-// signatures:
-// date, integer -> bigint
-// type: SimpleFunction
-class MyDateRegisterer final : public gluten::UdfRegisterer {
- public:
- int getNumUdf() override {
- return 1;
- }
-
- void populateUdfEntries(int& index, gluten::UdfEntry* udfEntries) override {
- udfEntries[index++] = {name_.c_str(), kDate, 2, myDateArg_};
- }
-
- void registerSignatures() override {
- facebook::velox::registerFunction<mydate::MyDateSimpleFunction, Date,
Date, int32_t>({name_});
- }
-
- private:
- const std::string name_ = "mydate";
- const char* myDateArg_[2] = {kDate, kInteger};
-};
-
-// name: mydate
-// signatures:
-// date, integer -> bigint
-// type: SimpleFunction
-// enable type conversion
-class MyDate2Registerer final : public gluten::UdfRegisterer {
- public:
- int getNumUdf() override {
- return 1;
- }
-
- void populateUdfEntries(int& index, gluten::UdfEntry* udfEntries) override {
- udfEntries[index++] = {name_.c_str(), kDate, 2, myDateArg_, false, true};
- }
-
- void registerSignatures() override {
- facebook::velox::registerFunction<mydate::MyDateSimpleFunction, Date,
Date, int32_t>({name_});
- }
-
- private:
- const std::string name_ = "mydate2";
- const char* myDateArg_[2] = {kDate, kInteger};
-};
-} // namespace mydate
-
namespace hivestringstring {
template <typename T>
struct HiveStringStringFunction {
@@ -297,11 +80,6 @@ void setupRegisterers() {
return;
}
auto& registerers = globalRegisters();
- registerers.push_back(std::make_shared<myudf::MyUdf1Registerer>());
- registerers.push_back(std::make_shared<myudf::MyUdf2Registerer>());
- registerers.push_back(std::make_shared<myudf::MyUdf3Registerer>());
- registerers.push_back(std::make_shared<mydate::MyDateRegisterer>());
- registerers.push_back(std::make_shared<mydate::MyDate2Registerer>());
registerers.push_back(std::make_shared<hivestringstring::HiveStringStringRegisterer>());
inited = true;
}
diff --git a/cpp/velox/udf/examples/TestMyUDF.cc
b/cpp/velox/udf/examples/TestMyUDF.cc
deleted file mode 100644
index 794dd0613..000000000
--- a/cpp/velox/udf/examples/TestMyUDF.cc
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <iostream>
-#include "udf/UdfLoader.h"
-
-#include "velox/expression/VectorFunction.h"
-
-int main() {
- auto udfLoader = gluten::UdfLoader::getInstance();
- udfLoader->loadUdfLibraries("libmyudf.so");
- udfLoader->registerUdf();
-
- auto map = facebook::velox::exec::vectorFunctionFactories();
- const std::vector<std::string> candidates = {"myudf1", "myudf2"};
- auto f = map.withRLock([&candidates](auto& self) -> bool {
- return std::all_of(candidates.begin(), candidates.end(), [&](const auto&
funcName) {
- auto iter = self.find(funcName);
- std::unordered_map<std::string, std::string> values;
- const facebook::velox::core::QueryConfig config(std::move(values));
- return iter->second.factory(
- funcName,
{facebook::velox::exec::VectorFunctionArg{facebook::velox::BIGINT()}}, config)
!= nullptr;
- });
- });
-
- if (!f) {
- return 1;
- }
-
- return 0;
-}
diff --git a/docs/developers/VeloxUDF.md b/docs/developers/VeloxUDF.md
index 6872f2d0c..4f685cc41 100644
--- a/docs/developers/VeloxUDF.md
+++ b/docs/developers/VeloxUDF.md
@@ -10,9 +10,10 @@ parent: /developer-overview/
## Introduction
Velox backend supports User-Defined Functions (UDF) and User-Defined Aggregate
Functions (UDAF).
-Users can create their own functions using the UDF interface provided in Velox
backend and build libraries for these functions.
-At runtime, the UDF are registered at the start of applications.
-Once registered, Gluten will be able to parse and offload these UDF into Velox
during execution.
+Users can implement custom functions using the UDF interface provided by Velox
and compile them into libraries.
+At runtime, these UDFs are registered alongside their Java implementations via
`CREATE TEMPORARY FUNCTION`.
+Once registered, Gluten can parse and offload these UDFs to Velox during
execution,
+meanwhile ensuring proper fallback to Java UDFs when necessary.
## Create and Build UDF/UDAF library
@@ -39,22 +40,23 @@ The following steps demonstrate how to set up a UDF library
project:
This function is called to register the UDF to Velox function registry.
This is where users should register functions by calling
`facebook::velox::exec::registerVecotorFunction` or other Velox APIs.
- - The interface functions are mapped to marcos in
[Udf.h](../../cpp/velox/udf/Udf.h). Here's an example of how to implement these
functions:
+ - The interface functions are mapped to marcos in
[Udf.h](../../cpp/velox/udf/Udf.h).
+
+ Assuming there is an existing Hive UDF `org.apache.gluten.sql.hive.MyUDF`,
its native UDF can be implemented as follows.
```
- // Filename MyUDF.cc
-
#include <velox/expression/VectorFunction.h>
#include <velox/udf/Udf.h>
namespace {
static const char* kInteger = "integer";
+ static const char* kMyUdfFunctionName = "org.apache.gluten.sql.hive.MyUDF";
}
const int kNumMyUdf = 1;
const char* myUdfArgs[] = {kInteger}:
- gluten::UdfEntry myUdfSig = {"myudf", kInteger, 1, myUdfArgs};
+ gluten::UdfEntry myUdfSig = {kMyUdfFunctionName, kInteger, 1, myUdfArgs};
class MyUdf : public facebook::velox::exec::VectorFunction {
... // Omit concrete implementation
@@ -167,20 +169,25 @@ or
--conf
spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=file:///path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
```
-Run query. The functions `myudf1` and `myudf2` increment the input value by a
constant of 5
+Start `spark-sql` and run query. You need to add jar
"spark-hive_2.12-<spark.version>-tests.jar" to the classpath for hive udf
`org.apache.spark.sql.hive.execution.UDFStringString`
```
-select myudf1(100L), myudf2(1)
+spark-sql (default)> CREATE TEMPORARY FUNCTION hive_string_string AS
'org.apache.spark.sql.hive.execution.UDFStringString';
+Time taken: 0.808 seconds
+spark-sql (default)> select hive_string_string("hello", "world");
+hello world
+Time taken: 3.208 seconds, Fetched 1 row(s)
```
-The output from spark-shell will be like
-
+You can verify the offload with "explain".
```
-+------------------+----------------+
-|udfexpression(100)|udfexpression(1)|
-+------------------+----------------+
-| 105| 6|
-+------------------+----------------+
+spark-sql (default)> explain select hive_string_string("hello", "world");
+== Physical Plan ==
+VeloxColumnarToRowExec
++- ^(2) ProjectExecTransformer [hello world AS hive_string_string(hello,
world)#8]
+ +- ^(2) InputIteratorTransformer[fake_column#9]
+ +- RowToVeloxColumnar
+ +- *(1) Scan OneRowRelation[fake_column#9]
```
## Configurations
diff --git
a/backends-velox/src/main/java/org/apache/gluten/udf/UdfJniWrapper.java
b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveUDAFInspector.scala
similarity index 64%
copy from backends-velox/src/main/java/org/apache/gluten/udf/UdfJniWrapper.java
copy to
gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveUDAFInspector.scala
index 8bfe8bad5..7c6401b67 100644
--- a/backends-velox/src/main/java/org/apache/gluten/udf/UdfJniWrapper.java
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveUDAFInspector.scala
@@ -14,9 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.udf;
+package org.apache.spark.sql.hive
-public class UdfJniWrapper {
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.aggregate.ScalaUDAF
- public static native void getFunctionSignatures();
+object HiveUDAFInspector {
+ def getUDAFClassName(expr: Expression): Option[String] = {
+ expr match {
+ case func: HiveUDAFFunction => Some(func.funcWrapper.functionClassName)
+ case scalaUDAF: ScalaUDAF => Some(scalaUDAF.udaf.getClass.getName)
+ case _ => None
+ }
+ }
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/AggregateFunctionsBuilder.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/AggregateFunctionsBuilder.scala
index bd73b7b7a..15de4a734 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/AggregateFunctionsBuilder.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/AggregateFunctionsBuilder.scala
@@ -56,7 +56,7 @@ object AggregateFunctionsBuilder {
case _ =>
val nameOpt =
ExpressionMappings.expressionsMap.get(aggregateFunc.getClass)
if (nameOpt.isEmpty) {
- throw new UnsupportedOperationException(
+ throw new GlutenNotSupportException(
s"Could not find a valid substrait mapping name for
$aggregateFunc.")
}
nameOpt.get match {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]