This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 5894e3679 [VL] Support native UDAF (#5130)
5894e3679 is described below
commit 5894e3679107400c9b155aafb6e5e5c11b6dff16
Author: Rong Ma <[email protected]>
AuthorDate: Mon Apr 1 22:49:33 2024 +0800
[VL] Support native UDAF (#5130)
---
backends-velox/pom.xml | 4 +-
.../backendsapi/velox/SparkPlanExecApiImpl.scala | 6 +-
.../execution/HashAggregateExecTransformer.scala | 3 +
.../apache/spark/sql/expression/UDFResolver.scala | 109 ++++++++++++-
.../apache/gluten/expression/VeloxUdfSuite.scala | 17 +-
cpp/velox/jni/JniUdf.cc | 14 +-
.../substrait/SubstraitToVeloxPlanValidator.cc | 5 +-
cpp/velox/udf/{examples/TestMyUDF.cc => Udaf.h} | 48 +++---
cpp/velox/udf/UdfLoader.cc | 118 +++++++++-----
cpp/velox/udf/UdfLoader.h | 13 +-
cpp/velox/udf/examples/CMakeLists.txt | 7 +-
cpp/velox/udf/examples/MyUDAF.cc | 181 +++++++++++++++++++++
cpp/velox/udf/examples/{MyUDF.cpp => MyUDF.cc} | 10 +-
cpp/velox/udf/examples/TestMyUDF.cc | 15 +-
docs/get-started/Velox.md | 127 ++++++++++-----
15 files changed, 541 insertions(+), 136 deletions(-)
diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml
index 6a598e8c0..70b8b901b 100755
--- a/backends-velox/pom.xml
+++ b/backends-velox/pom.xml
@@ -225,7 +225,9 @@
<junitxml>.</junitxml>
<tagsToExclude>${tagsToExclude}</tagsToExclude>
<systemProperties>
-
<velox.udf.lib.path>${cpp.build.dir}/velox/udf/examples/libmyudf.so</velox.udf.lib.path>
+ <velox.udf.lib.path>
+
${cpp.build.dir}/velox/udf/examples/libmyudf.so,${cpp.build.dir}/velox/udf/examples/libmyudaf.so
+ </velox.udf.lib.path>
</systemProperties>
</configuration>
</plugin>
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/SparkPlanExecApiImpl.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/SparkPlanExecApiImpl.scala
index 6f6286e5c..b723b865a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/SparkPlanExecApiImpl.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/SparkPlanExecApiImpl.scala
@@ -50,7 +50,7 @@ import
org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleEx
import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.utils.ExecUtil
-import org.apache.spark.sql.expression.{UDFExpression, UDFResolver}
+import org.apache.spark.sql.expression.{UDFExpression, UDFResolver,
UserDefinedAggregateFunction}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -644,7 +644,9 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi {
Seq(
Sig[HLLAdapter](ExpressionNames.APPROX_DISTINCT),
Sig[UDFExpression](ExpressionNames.UDF_PLACEHOLDER),
- Sig[NaNvl](ExpressionNames.NANVL))
+ Sig[UserDefinedAggregateFunction](ExpressionNames.UDF_PLACEHOLDER),
+ Sig[NaNvl](ExpressionNames.NANVL)
+ )
}
override def genInjectedFunctions()
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
index 2a0ac1ff9..d3f9bd78f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
@@ -31,6 +31,7 @@ import org.apache.gluten.utils.VeloxIntermediateData
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.expression.UserDefinedAggregateFunction
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -710,6 +711,8 @@ object VeloxAggregateFunctionsBuilder {
if (ignoreNulls) sigName = Some(ExpressionNames.FIRST_IGNORE_NULL)
case Last(_, ignoreNulls) =>
if (ignoreNulls) sigName = Some(ExpressionNames.LAST_IGNORE_NULL)
+ case UserDefinedAggregateFunction(name, _, _, _, _) =>
+ sigName = Some(name)
case _ =>
}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
index e97450d96..6c8111350 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
@@ -27,10 +27,13 @@ import org.apache.gluten.vectorized.JniWorkspace
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFiles}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression, ExpressionInfo}
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
ExprCode}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.util.Utils
import com.google.common.collect.Lists
@@ -42,6 +45,33 @@ import java.nio.file.{Files, FileVisitOption, Paths}
import scala.collection.JavaConverters.asScalaIteratorConverter
import scala.collection.mutable
+case class UserDefinedAggregateFunction(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ children: Seq[Expression],
+ override val aggBufferAttributes: Seq[AttributeReference])
+ extends AggregateFunction {
+
+ override def aggBufferSchema: StructType =
+ StructType(
+ aggBufferAttributes.map(a => StructField(a.name, a.dataType, a.nullable,
a.metadata)))
+
+ override val inputAggBufferAttributes: Seq[AttributeReference] =
+ aggBufferAttributes.map(_.newInstance())
+
+ final override def eval(input: InternalRow = null): Any =
+ throw QueryExecutionErrors.cannotEvaluateExpressionError(this)
+
+ final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode):
ExprCode =
+ throw QueryExecutionErrors.cannotGenerateCodeForExpressionError(this)
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): Expression = {
+ this.copy(children = newChildren)
+ }
+}
+
case class UDFExpression(
name: String,
dataType: DataType,
@@ -78,9 +108,14 @@ case class UDFExpression(
object UDFResolver extends Logging {
private val UDFNames = mutable.HashSet[String]()
-
+ // (udf_name, arg1, arg2, ...) => return type
private val UDFMap = mutable.HashMap[(String, Seq[DataType]),
ExpressionType]()
+ private val UDAFNames = mutable.HashSet[String]()
+ // (udaf_name, arg1, arg2, ...) => return type, intermediate attributes
+ private val UDAFMap =
+ mutable.HashMap[(String, Seq[DataType]), (ExpressionType,
Seq[AttributeReference])]()
+
private val LIB_EXTENSION = ".so"
private lazy val isDriver: Boolean =
@@ -103,7 +138,41 @@ object UDFResolver extends Logging {
(name,
argTypes.dataType.asInstanceOf[StructType].fields.map(_.dataType)),
returnType)
UDFNames += name
- logInfo(s"Registered UDF: $name -> $returnType")
+ logInfo(s"Registered UDF: $name($argTypes) -> $returnType")
+ }
+
+ def registerUDAF(
+ name: String,
+ returnType: Array[Byte],
+ argTypes: Array[Byte],
+ intermediateTypes: Array[Byte]): Unit = {
+ registerUDAF(
+ name,
+ ConverterUtils.parseFromBytes(returnType),
+ ConverterUtils.parseFromBytes(argTypes),
+ ConverterUtils.parseFromBytes(intermediateTypes)
+ )
+ }
+
+ private def registerUDAF(
+ name: String,
+ returnType: ExpressionType,
+ argTypes: ExpressionType,
+ intermediateTypes: ExpressionType): Unit = {
+ assert(argTypes.dataType.isInstanceOf[StructType])
+ assert(intermediateTypes.dataType.isInstanceOf[StructType])
+
+ val aggBufferAttributes =
+
intermediateTypes.dataType.asInstanceOf[StructType].fields.zipWithIndex.map {
+ case (f, index) =>
+ AttributeReference(s"inter_$index", f.dataType, f.nullable)()
+ }
+ UDAFMap.put(
+ (name,
argTypes.dataType.asInstanceOf[StructType].fields.map(_.dataType)),
+ (returnType, aggBufferAttributes)
+ )
+ UDAFNames += name
+ logInfo(s"Registered UDAF: $name($argTypes) -> $returnType")
}
def parseName(name: String): (String, String) = {
@@ -241,13 +310,41 @@ object UDFResolver extends Logging {
new FunctionIdentifier(name),
new ExpressionInfo(classOf[UDFExpression].getName, name),
(e: Seq[Expression]) => getUdfExpression(name)(e))
+ }.toSeq ++ UDAFNames.map {
+ name =>
+ (
+ new FunctionIdentifier(name),
+ new
ExpressionInfo(classOf[UserDefinedAggregateFunction].getName, name),
+ (e: Seq[Expression]) => getUdafExpression(name)(e))
}.toSeq
}
}
private def getUdfExpression(name: String)(children: Seq[Expression]) = {
val expressionType =
- UDFMap.getOrElse((name, children.map(_.dataType)), throw new
IllegalStateException())
+ UDFMap.getOrElse(
+ (name, children.map(_.dataType)),
+ throw new UnsupportedOperationException(
+ s"UDF $name -> ${children.map(_.dataType.simpleString).mkString(",
")} " +
+ s"is not registered.")
+ )
UDFExpression(name, expressionType.dataType, expressionType.nullable,
children)
}
+
+ private def getUdafExpression(name: String)(children: Seq[Expression]) = {
+ val (expressionType, aggBufferAttributes) =
+ UDAFMap.getOrElse(
+ (name, children.map(_.dataType)),
+ throw new UnsupportedOperationException(
+ s"UDAF $name -> ${children.map(_.dataType.simpleString).mkString(",
")} " +
+ s"is not registered.")
+ )
+
+ UserDefinedAggregateFunction(
+ name,
+ expressionType.dataType,
+ expressionType.nullable,
+ children,
+ aggBufferAttributes)
+ }
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala
index e12b3df2e..a1c50f610 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala
@@ -79,7 +79,22 @@ abstract class VeloxUdfSuite extends GlutenQueryTest with
SQLHelper {
| mydate(cast('2024-03-25' as date), 5)
|""".stripMargin)
df.collect()
- assert(df.collect().sameElements(Array(Row(6, 6L, 105,
Date.valueOf("2024-03-30")))))
+ assert(
+ df.collect()
+ .sameElements(Array(Row(6, 6L, 105, Date.valueOf("2024-03-30")))))
+ }
+
+ test("test udaf") {
+ val df = spark.sql("""select
+ | myavg(1),
+ | myavg(1L),
+ | myavg(cast(1.0 as float)),
+ | myavg(cast(1.0 as double))
+ |""".stripMargin)
+ df.collect()
+ assert(
+ df.collect()
+ .sameElements(Array(Row(1.0, 1.0, 1.0, 1.0))))
}
}
diff --git a/cpp/velox/jni/JniUdf.cc b/cpp/velox/jni/JniUdf.cc
index 7fa6c341e..cd5a4f7c8 100644
--- a/cpp/velox/jni/JniUdf.cc
+++ b/cpp/velox/jni/JniUdf.cc
@@ -28,6 +28,7 @@ const std::string kUdfResolverClassPath =
"Lorg/apache/spark/sql/expression/UDFR
static jclass udfResolverClass;
static jmethodID registerUDFMethod;
+static jmethodID registerUDAFMethod;
} // namespace
@@ -41,6 +42,7 @@ void gluten::initVeloxJniUDF(JNIEnv* env) {
// methods
registerUDFMethod = getMethodIdOrError(env, udfResolverClass, "registerUDF",
"(Ljava/lang/String;[B[B)V");
+ registerUDAFMethod = getMethodIdOrError(env, udfResolverClass,
"registerUDAF", "(Ljava/lang/String;[B[B[B)V");
}
void gluten::finalizeVeloxJniUDF(JNIEnv* env) {
@@ -61,7 +63,17 @@ void gluten::jniGetFunctionSignatures(JNIEnv* env) {
argTypes, 0, signature->argTypes.length(), reinterpret_cast<const
jbyte*>(signature->argTypes.c_str()));
jobject instance = env->GetStaticObjectField(
udfResolverClass, env->GetStaticFieldID(udfResolverClass, "MODULE$",
kUdfResolverClassPath.c_str()));
- env->CallVoidMethod(instance, registerUDFMethod, name, returnType,
argTypes);
+ if (!signature->intermediateType.empty()) {
+ jbyteArray intermediateType =
env->NewByteArray(signature->intermediateType.length());
+ env->SetByteArrayRegion(
+ intermediateType,
+ 0,
+ signature->intermediateType.length(),
+ reinterpret_cast<const jbyte*>(signature->intermediateType.c_str()));
+ env->CallVoidMethod(instance, registerUDAFMethod, name, returnType,
argTypes, intermediateType);
+ } else {
+ env->CallVoidMethod(instance, registerUDFMethod, name, returnType,
argTypes);
+ }
checkException(env);
}
}
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
index 0fd767f6a..8401e7d8a 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
@@ -20,6 +20,7 @@
#include <re2/re2.h>
#include <string>
#include "TypeUtils.h"
+#include "udf/UdfLoader.h"
#include "utils/Common.h"
#include "velox/core/ExpressionEvaluator.h"
#include "velox/exec/Aggregate.h"
@@ -1148,9 +1149,11 @@ bool SubstraitToVeloxPlanValidator::validate(const
::substrait::AggregateRel& ag
"skewness",
"kurtosis"};
+ auto udfFuncs = UdfLoader::getInstance()->getRegisteredUdafNames();
+
for (const auto& funcSpec : funcSpecs) {
auto funcName = SubstraitParser::getNameBeforeDelimiter(funcSpec);
- if (supportedAggFuncs.find(funcName) == supportedAggFuncs.end()) {
+ if (supportedAggFuncs.find(funcName) == supportedAggFuncs.end() &&
udfFuncs.find(funcName) == udfFuncs.end()) {
LOG_VALIDATION_MSG(funcName + " was not supported in AggregateRel.");
return false;
}
diff --git a/cpp/velox/udf/examples/TestMyUDF.cc b/cpp/velox/udf/Udaf.h
similarity index 52%
copy from cpp/velox/udf/examples/TestMyUDF.cc
copy to cpp/velox/udf/Udaf.h
index e01c2f2c9..7e8f03402 100644
--- a/cpp/velox/udf/examples/TestMyUDF.cc
+++ b/cpp/velox/udf/Udaf.h
@@ -15,28 +15,26 @@
* limitations under the License.
*/
-#include <iostream>
-#include "udf/UdfLoader.h"
-
-#include "velox/expression/VectorFunction.h"
-
-int main() {
- auto udfLoader = gluten::UdfLoader::getInstance();
- udfLoader->loadUdfLibraries("libmyudf.so");
- udfLoader->registerUdf();
-
- auto map = facebook::velox::exec::vectorFunctionFactories();
- const std::string funcName = "myudf1";
- auto f = map.withRLock([&funcName](auto& self) ->
std::shared_ptr<facebook::velox::exec::VectorFunction> {
- auto iter = self.find(funcName);
- std::unordered_map<std::string, std::string> values;
- const facebook::velox::core::QueryConfig config(std::move(values));
- return iter->second.factory(funcName, {}, config);
- });
-
- if (!f) {
- return 1;
- }
-
- return 0;
-}
+#pragma once
+
+namespace gluten {
+
+struct UdafEntry {
+ const char* name;
+ const char* dataType;
+
+ size_t numArgs;
+ const char** argTypes;
+
+ const char* intermediateType{nullptr};
+};
+
+#define GLUTEN_GET_NUM_UDAF getNumUdaf
+#define DEFINE_GET_NUM_UDAF extern "C" int GLUTEN_GET_NUM_UDAF()
+
+#define GLUTEN_GET_UDAF_ENTRIES getUdfEntries
+#define DEFINE_GET_UDAF_ENTRIES extern "C" void
GLUTEN_GET_UDAF_ENTRIES(gluten::UdafEntry* udafEntries)
+
+#define GLUTEN_REGISTER_UDAF registerUdf
+#define DEFINE_REGISTER_UDAF extern "C" void GLUTEN_REGISTER_UDAF()
+} // namespace gluten
diff --git a/cpp/velox/udf/UdfLoader.cc b/cpp/velox/udf/UdfLoader.cc
index 3025b7388..a8a99ce9f 100644
--- a/cpp/velox/udf/UdfLoader.cc
+++ b/cpp/velox/udf/UdfLoader.cc
@@ -17,12 +17,12 @@
#include <dlfcn.h>
#include <google/protobuf/arena.h>
-#include <velox/expression/SignatureBinder.h>
-#include <velox/expression/VectorFunction.h>
-#include <velox/type/fbhive/HiveTypeParser.h>
-#include <functional>
#include <vector>
+#include "velox/expression/SignatureBinder.h"
+#include "velox/expression/VectorFunction.h"
+#include "velox/type/fbhive/HiveTypeParser.h"
+#include "Udaf.h"
#include "Udf.h"
#include "UdfLoader.h"
#include "utils/StringUtil.h"
@@ -31,9 +31,13 @@
namespace {
-void* loadSymFromLibrary(void* handle, const std::string& libPath, const
std::string& func) {
+void* loadSymFromLibrary(
+ void* handle,
+ const std::string& libPath,
+ const std::string& func,
+ bool throwIfNotFound = true) {
void* sym = dlsym(handle, func.c_str());
- if (!sym) {
+ if (!sym && throwIfNotFound) {
throw gluten::GlutenException(func + " not found in " + libPath);
}
return sym;
@@ -59,29 +63,79 @@ void UdfLoader::loadUdfLibraries0(const
std::vector<std::string>& libPaths) {
}
std::unordered_set<std::shared_ptr<UdfLoader::UdfSignature>>
UdfLoader::getRegisteredUdfSignatures() {
- std::unordered_set<std::shared_ptr<UdfSignature>> signatures;
+ if (!signatures_.empty()) {
+ return signatures_;
+ }
for (const auto& item : handles_) {
const auto& libPath = item.first;
const auto& handle = item.second;
- void* getNumUdfSym = loadSymFromLibrary(handle, libPath,
GLUTEN_TOSTRING(GLUTEN_GET_NUM_UDF));
- auto getNumUdf = reinterpret_cast<int (*)()>(getNumUdfSym);
- // allocate
- int numUdf = getNumUdf();
- UdfEntry* udfEntry = static_cast<UdfEntry*>(malloc(sizeof(UdfEntry) *
numUdf));
-
- void* getUdfEntriesSym = loadSymFromLibrary(handle, libPath,
GLUTEN_TOSTRING(GLUTEN_GET_UDF_ENTRIES));
- auto getUdfEntries = reinterpret_cast<void
(*)(UdfEntry*)>(getUdfEntriesSym);
- getUdfEntries(udfEntry);
-
- for (auto i = 0; i < numUdf; ++i) {
- const auto& entry = udfEntry[i];
- auto dataType = toSubstraitTypeStr(entry.dataType);
- auto argTypes = toSubstraitTypeStr(entry.numArgs, entry.argTypes);
- signatures.insert(std::make_shared<UdfSignature>(entry.name, dataType,
argTypes));
+
+ // Handle UDFs.
+ void* getNumUdfSym = loadSymFromLibrary(handle, libPath,
GLUTEN_TOSTRING(GLUTEN_GET_NUM_UDF), false);
+ if (getNumUdfSym) {
+ auto getNumUdf = reinterpret_cast<int (*)()>(getNumUdfSym);
+ int numUdf = getNumUdf();
+ // allocate
+ UdfEntry* udfEntries = static_cast<UdfEntry*>(malloc(sizeof(UdfEntry) *
numUdf));
+
+ void* getUdfEntriesSym = loadSymFromLibrary(handle, libPath,
GLUTEN_TOSTRING(GLUTEN_GET_UDF_ENTRIES));
+ auto getUdfEntries = reinterpret_cast<void
(*)(UdfEntry*)>(getUdfEntriesSym);
+ getUdfEntries(udfEntries);
+
+ for (auto i = 0; i < numUdf; ++i) {
+ const auto& entry = udfEntries[i];
+ auto dataType = toSubstraitTypeStr(entry.dataType);
+ auto argTypes = toSubstraitTypeStr(entry.numArgs, entry.argTypes);
+ signatures_.insert(std::make_shared<UdfSignature>(entry.name,
dataType, argTypes));
+ }
+ free(udfEntries);
+ } else {
+ LOG(INFO) << "No UDFs found in " << libPath;
+ }
+
+ // Handle UDAFs.
+ void* getNumUdafSym = loadSymFromLibrary(handle, libPath,
GLUTEN_TOSTRING(GLUTEN_GET_NUM_UDAF), false);
+ if (getNumUdafSym) {
+ auto getNumUdaf = reinterpret_cast<int (*)()>(getNumUdafSym);
+ int numUdaf = getNumUdaf();
+ // allocate
+ UdafEntry* udafEntries =
static_cast<UdafEntry*>(malloc(sizeof(UdafEntry) * numUdaf));
+
+ void* getUdafEntriesSym = loadSymFromLibrary(handle, libPath,
GLUTEN_TOSTRING(GLUTEN_GET_UDAF_ENTRIES));
+ auto getUdafEntries = reinterpret_cast<void
(*)(UdafEntry*)>(getUdafEntriesSym);
+ getUdafEntries(udafEntries);
+
+ for (auto i = 0; i < numUdaf; ++i) {
+ const auto& entry = udafEntries[i];
+ auto dataType = toSubstraitTypeStr(entry.dataType);
+ auto argTypes = toSubstraitTypeStr(entry.numArgs, entry.argTypes);
+ auto intermediateType = toSubstraitTypeStr(entry.intermediateType);
+ signatures_.insert(std::make_shared<UdfSignature>(entry.name,
dataType, argTypes, intermediateType));
+ }
+ free(udafEntries);
+ } else {
+ LOG(INFO) << "No UDAFs found in " << libPath;
}
- free(udfEntry);
}
- return signatures;
+ return signatures_;
+}
+
+std::unordered_set<std::string> UdfLoader::getRegisteredUdafNames() {
+ if (handles_.empty()) {
+ return {};
+ }
+ if (!names_.empty()) {
+ return names_;
+ }
+ if (signatures_.empty()) {
+ getRegisteredUdfSignatures();
+ }
+ for (const auto& sig : signatures_) {
+ if (!sig->intermediateType.empty()) {
+ names_.insert(sig->name);
+ }
+ }
+ return names_;
}
void UdfLoader::registerUdf() {
@@ -92,23 +146,9 @@ void UdfLoader::registerUdf() {
}
}
-bool UdfLoader::validateUdf(const std::string& name, const
std::vector<facebook::velox::TypePtr>& argTypes) {
- const auto& functionMap = facebook::velox::exec::vectorFunctionFactories();
- auto got = functionMap->find(name);
- if (got != functionMap->end()) {
- const auto& entry = got->second;
- for (auto& signature : entry.signatures) {
- facebook::velox::exec::SignatureBinder binder(*signature, argTypes);
- if (binder.tryBind()) {
- return true;
- }
- }
- }
- return false;
-}
-
std::shared_ptr<UdfLoader> UdfLoader::getInstance() {
static auto instance = std::make_shared<UdfLoader>();
return instance;
}
+
} // namespace gluten
diff --git a/cpp/velox/udf/UdfLoader.h b/cpp/velox/udf/UdfLoader.h
index 8e2004853..31098d2f4 100644
--- a/cpp/velox/udf/UdfLoader.h
+++ b/cpp/velox/udf/UdfLoader.h
@@ -34,9 +34,14 @@ class UdfLoader {
std::string returnType;
std::string argTypes;
+ std::string intermediateType{};
+
UdfSignature(std::string name, std::string returnType, std::string
argTypes)
: name(name), returnType(returnType), argTypes(argTypes) {}
+ UdfSignature(std::string name, std::string returnType, std::string
argTypes, std::string intermediateType)
+ : name(name), returnType(returnType), argTypes(argTypes),
intermediateType(intermediateType) {}
+
~UdfSignature() = default;
};
@@ -46,10 +51,9 @@ class UdfLoader {
std::unordered_set<std::shared_ptr<UdfSignature>>
getRegisteredUdfSignatures();
- void registerUdf();
+ std::unordered_set<std::string> getRegisteredUdafNames();
- // unused
- bool validateUdf(const std::string& name, const
std::vector<facebook::velox::TypePtr>& argTypes);
+ void registerUdf();
private:
void loadUdfLibraries0(const std::vector<std::string>& libPaths);
@@ -81,5 +85,8 @@ class UdfLoader {
facebook::velox::type::fbhive::HiveTypeParser parser_{};
google::protobuf::Arena arena_{};
VeloxToSubstraitTypeConvertor convertor_{};
+
+ std::unordered_set<std::shared_ptr<UdfSignature>> signatures_;
+ std::unordered_set<std::string> names_;
};
} // namespace gluten
diff --git a/cpp/velox/udf/examples/CMakeLists.txt
b/cpp/velox/udf/examples/CMakeLists.txt
index 20fd9c54e..3c51859ee 100644
--- a/cpp/velox/udf/examples/CMakeLists.txt
+++ b/cpp/velox/udf/examples/CMakeLists.txt
@@ -13,8 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-add_library(myudf SHARED "MyUDF.cpp")
+add_library(myudf SHARED "MyUDF.cc")
target_link_libraries(myudf velox)
+add_library(myudaf SHARED "MyUDAF.cc")
+target_link_libraries(myudaf velox)
+
add_executable(test_myudf "TestMyUDF.cc")
-target_link_libraries(test_myudf myudf)
+target_link_libraries(test_myudf velox)
diff --git a/cpp/velox/udf/examples/MyUDAF.cc b/cpp/velox/udf/examples/MyUDAF.cc
new file mode 100644
index 000000000..e6c4b1fea
--- /dev/null
+++ b/cpp/velox/udf/examples/MyUDAF.cc
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <velox/exec/SimpleAggregateAdapter.h>
+#include <velox/expression/VectorFunction.h>
+#include <velox/functions/Macros.h>
+#include <velox/functions/Registerer.h>
+#include <velox/functions/lib/aggregates/AverageAggregateBase.h>
+#include <iostream>
+#include "udf/Udaf.h"
+
+using namespace facebook::velox;
+using namespace facebook::velox::exec;
+
+namespace {
+
+static const char* kInteger = "int";
+static const char* kBigInt = "bigint";
+static const char* kFloat = "float";
+static const char* kDouble = "double";
+
+// Copied from velox/exec/tests/SimpleAverageAggregate.cpp
+
+// Implementation of the average aggregation function through the
+// SimpleAggregateAdapter.
+template <typename T>
+class AverageAggregate {
+ public:
+ // Type(s) of input vector(s) wrapped in Row.
+ using InputType = Row<T>;
+
+ // Type of intermediate result vector wrapped in Row.
+ using IntermediateType =
+ Row</*sum*/ double,
+ /*count*/ int64_t>;
+
+ // Type of output vector.
+ using OutputType = std::conditional_t<std::is_same_v<T, float>, float,
double>;
+
+ static bool toIntermediate(exec::out_type<Row<double, int64_t>>& out,
exec::arg_type<T> in) {
+ out.copy_from(std::make_tuple(static_cast<double>(in), 1));
+ return true;
+ }
+
+ struct AccumulatorType {
+ double sum_;
+ int64_t count_;
+
+ AccumulatorType() = delete;
+
+ // Constructor used in initializeNewGroups().
+ explicit AccumulatorType(HashStringAllocator* /*allocator*/) {
+ sum_ = 0;
+ count_ = 0;
+ }
+
+ // addInput expects one parameter of exec::arg_type<T> for each child-type
T
+ // wrapped in InputType.
+ void addInput(HashStringAllocator* /*allocator*/, exec::arg_type<T> data) {
+ sum_ += data;
+ count_ = checkedPlus<int64_t>(count_, 1);
+ }
+
+ // combine expects one parameter of exec::arg_type<IntermediateType>.
+ void combine(HashStringAllocator* /*allocator*/,
exec::arg_type<Row<double, int64_t>> other) {
+ // Both field of an intermediate result should be non-null because
+ // writeIntermediateResult() never make an intermediate result with a
+ // single null.
+ VELOX_CHECK(other.at<0>().has_value());
+ VELOX_CHECK(other.at<1>().has_value());
+ sum_ += other.at<0>().value();
+ count_ = checkedPlus<int64_t>(count_, other.at<1>().value());
+ }
+
+ bool writeFinalResult(exec::out_type<OutputType>& out) {
+ out = sum_ / count_;
+ return true;
+ }
+
+ bool writeIntermediateResult(exec::out_type<IntermediateType>& out) {
+ out = std::make_tuple(sum_, count_);
+ return true;
+ }
+ };
+};
+
+exec::AggregateRegistrationResult registerSimpleAverageAggregate(const
std::string& name) {
+ std::vector<std::shared_ptr<exec::AggregateFunctionSignature>> signatures;
+
+ for (const auto& inputType : {"smallint", "integer", "bigint", "double"}) {
+ signatures.push_back(exec::AggregateFunctionSignatureBuilder()
+ .returnType("double")
+ .intermediateType("row(double,bigint)")
+ .argumentType(inputType)
+ .build());
+ }
+
+ signatures.push_back(exec::AggregateFunctionSignatureBuilder()
+ .returnType("real")
+ .intermediateType("row(double,bigint)")
+ .argumentType("real")
+ .build());
+
+ return exec::registerAggregateFunction(
+ name,
+ std::move(signatures),
+ [name](
+ core::AggregationNode::Step step,
+ const std::vector<TypePtr>& argTypes,
+ const TypePtr& resultType,
+ const core::QueryConfig& /*config*/) ->
std::unique_ptr<exec::Aggregate> {
+ VELOX_CHECK_LE(argTypes.size(), 1, "{} takes at most one argument",
name);
+ auto inputType = argTypes[0];
+ if (exec::isRawInput(step)) {
+ switch (inputType->kind()) {
+ case TypeKind::SMALLINT:
+ return
std::make_unique<SimpleAggregateAdapter<AverageAggregate<int16_t>>>(resultType);
+ case TypeKind::INTEGER:
+ return
std::make_unique<SimpleAggregateAdapter<AverageAggregate<int32_t>>>(resultType);
+ case TypeKind::BIGINT:
+ return
std::make_unique<SimpleAggregateAdapter<AverageAggregate<int64_t>>>(resultType);
+ case TypeKind::REAL:
+ return
std::make_unique<SimpleAggregateAdapter<AverageAggregate<float>>>(resultType);
+ case TypeKind::DOUBLE:
+ return
std::make_unique<SimpleAggregateAdapter<AverageAggregate<double>>>(resultType);
+ default:
+ VELOX_FAIL("Unknown input type for {} aggregation {}", name,
inputType->kindName());
+ }
+ } else {
+ switch (resultType->kind()) {
+ case TypeKind::REAL:
+ return
std::make_unique<SimpleAggregateAdapter<AverageAggregate<float>>>(resultType);
+ case TypeKind::DOUBLE:
+ case TypeKind::ROW:
+ return
std::make_unique<SimpleAggregateAdapter<AverageAggregate<double>>>(resultType);
+ default:
+ VELOX_FAIL("Unsupported result type for final aggregation: {}",
resultType->kindName());
+ }
+ }
+ },
+ true /*registerCompanionFunctions*/,
+ true /*overwrite*/);
+}
+} // namespace
+
+const int kNumMyUdaf = 4;
+
+DEFINE_GET_NUM_UDAF {
+ return kNumMyUdaf;
+}
+
+const char* myAvgArg1[] = {kInteger};
+const char* myAvgArg2[] = {kBigInt};
+const char* myAvgArg3[] = {kFloat};
+const char* myAvgArg4[] = {kDouble};
+const char* myAvgIntermediateType = "struct<a:double,b:bigint>";
+DEFINE_GET_UDAF_ENTRIES {
+ int index = 0;
+ udafEntries[index++] = {"myavg", kDouble, 1, myAvgArg1,
myAvgIntermediateType};
+ udafEntries[index++] = {"myavg", kDouble, 1, myAvgArg2,
myAvgIntermediateType};
+ udafEntries[index++] = {"myavg", kDouble, 1, myAvgArg3,
myAvgIntermediateType};
+ udafEntries[index++] = {"myavg", kDouble, 1, myAvgArg4,
myAvgIntermediateType};
+}
+
+DEFINE_REGISTER_UDAF {
+ registerSimpleAverageAggregate("myavg");
+}
diff --git a/cpp/velox/udf/examples/MyUDF.cpp b/cpp/velox/udf/examples/MyUDF.cc
similarity index 93%
rename from cpp/velox/udf/examples/MyUDF.cpp
rename to cpp/velox/udf/examples/MyUDF.cc
index 5115a2d12..578e3effb 100644
--- a/cpp/velox/udf/examples/MyUDF.cpp
+++ b/cpp/velox/udf/examples/MyUDF.cc
@@ -24,6 +24,7 @@
namespace {
using namespace facebook::velox;
+using namespace facebook::velox::exec;
static const char* kInteger = "int";
static const char* kBigInt = "bigint";
@@ -118,10 +119,11 @@ const char* myUdf1Arg2[] = {kBigInt};
const char* myUdf2Arg1[] = {kBigInt};
const char* myDateArg[] = {kDate, kInteger};
DEFINE_GET_UDF_ENTRIES {
- udfEntries[0] = {"myudf1", kInteger, 1, myUdf1Arg1};
- udfEntries[1] = {"myudf1", kBigInt, 1, myUdf1Arg2};
- udfEntries[2] = {"myudf2", kBigInt, 1, myUdf2Arg1};
- udfEntries[3] = {"mydate", kDate, 2, myDateArg};
+ int index = 0;
+ udfEntries[index++] = {"myudf1", kInteger, 1, myUdf1Arg1};
+ udfEntries[index++] = {"myudf1", kBigInt, 1, myUdf1Arg2};
+ udfEntries[index++] = {"myudf2", kBigInt, 1, myUdf2Arg1};
+ udfEntries[index++] = {"mydate", kDate, 2, myDateArg};
}
DEFINE_REGISTER_UDF {
diff --git a/cpp/velox/udf/examples/TestMyUDF.cc
b/cpp/velox/udf/examples/TestMyUDF.cc
index e01c2f2c9..794dd0613 100644
--- a/cpp/velox/udf/examples/TestMyUDF.cc
+++ b/cpp/velox/udf/examples/TestMyUDF.cc
@@ -26,12 +26,15 @@ int main() {
udfLoader->registerUdf();
auto map = facebook::velox::exec::vectorFunctionFactories();
- const std::string funcName = "myudf1";
- auto f = map.withRLock([&funcName](auto& self) ->
std::shared_ptr<facebook::velox::exec::VectorFunction> {
- auto iter = self.find(funcName);
- std::unordered_map<std::string, std::string> values;
- const facebook::velox::core::QueryConfig config(std::move(values));
- return iter->second.factory(funcName, {}, config);
+ const std::vector<std::string> candidates = {"myudf1", "myudf2"};
+ auto f = map.withRLock([&candidates](auto& self) -> bool {
+ return std::all_of(candidates.begin(), candidates.end(), [&](const auto&
funcName) {
+ auto iter = self.find(funcName);
+ std::unordered_map<std::string, std::string> values;
+ const facebook::velox::core::QueryConfig config(std::move(values));
+ return iter->second.factory(
+ funcName,
{facebook::velox::exec::VectorFunctionArg{facebook::velox::BIGINT()}}, config)
!= nullptr;
+ });
});
if (!f) {
diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md
index df0c6ec05..156c94694 100644
--- a/docs/get-started/Velox.md
+++ b/docs/get-started/Velox.md
@@ -4,6 +4,7 @@ title: Gluten with Velox Backend
nav_order: 1
parent: Getting-Started
---
+
# Supported Version
| Type | Version |
@@ -15,10 +16,10 @@ parent: Getting-Started
# Prerequisite
-Currently, Gluten+Velox backend is only tested on
**Ubuntu20.04/Ubuntu22.04/Centos7/Centos8**. Other kinds of OS support are
still in progress. The long term goal is to support several
-common OS and conda env deployment.
+Currently, Gluten+Velox backend is only tested on
**Ubuntu20.04/Ubuntu22.04/Centos7/Centos8**.
+Other kinds of OS support are still in progress. The long term goal is to
support several common OS and conda env deployment.
-Gluten only fully tested in CI with 3.2.2, 3.3.1 and 3.4.2. We will add/update
supported/tested versions according to the upstream changes.
+Gluten only fully tested in CI with 3.2.2, 3.3.1 and 3.4.2. We will add/update
supported/tested versions according to the upstream changes.
We need to set up the `JAVA_HOME` env. Currently, Gluten supports **java 8**
and **java 17**.
@@ -120,6 +121,7 @@ With config `enable_vcpkg=OFF`, the dependency libraries
won't be statically lin
Hadoop hdfs support is ready via the
[libhdfs3](https://github.com/apache/hawq/tree/master/depends/libhdfs3)
library. The libhdfs3 provides native API for Hadoop I/O without the drawbacks
of JNI. It also provides advanced authentication like Kerberos based. Please
note this library has several dependencies which may require extra
installations on Driver and Worker node.
### Build with HDFS support
+
To build Gluten with HDFS support, below command is suggested:
```bash
@@ -128,7 +130,7 @@ cd /path/to/gluten
```
### Configuration about HDFS support
-
+
HDFS uris (hdfs://host:port) will be extracted from a valid hdfs file path to
initialize hdfs client, you do not need to specify it explicitly.
libhdfs3 need a configuration file and [example
here](https://github.com/apache/hawq/blob/e9d43144f7e947e071bba48871af9da354d177d0/src/backend/utils/misc/etc/hdfs-client.xml),
this file is a bit different from hdfs-site.xml and core-site.xml.
@@ -159,12 +161,12 @@ You also need to add configuration to the "hdfs-site.xml"
as below:
```
<property>
- <name>dfs.client.read.shortcircuit</name>
- <value>true</value>
+ <name>dfs.client.read.shortcircuit</name>
+ <value>true</value>
</property>
<property>
- <name>dfs.domain.socket.path</name>
- <value>/var/lib/hadoop-hdfs/dn_socket</value>
+ <name>dfs.domain.socket.path</name>
+ <value>/var/lib/hadoop-hdfs/dn_socket</value>
</property>
```
@@ -174,10 +176,10 @@ Here are two steps to enable kerberos.
- Make sure the hdfs-client.xml contains
-```xml
+```
<property>
- <name>hadoop.security.authentication</name>
- <value>kerberos</value>
+ <name>hadoop.security.authentication</name>
+ <value>kerberos</value>
</property>
```
@@ -290,6 +292,7 @@ The gluten-iceberg jar is in `gluten-iceberg/target`
directory.
After the two steps, you can query iceberg table by gluten/velox without
scan's fallback.
# Coverage
+
Spark3.3 has 387 functions in total. ~240 are commonly used. Velox's functions
have two category, Presto and Spark. Presto has 124 functions implemented.
Spark has 62 functions. Spark functions are verified to have the same result as
Vanilla Spark. Some Presto functions have the same result as Vanilla Spark but
some others have different. Gluten prefer to use Spark functions firstly. If
it's not in Spark's list but implemented in Presto, we currently offload to
Presto one until we noted [...]
> Velox doesn't support [ANSI
> mode](https://spark.apache.org/docs/latest/sql-ref-ansi-compliance.html)),
> so as Gluten. Once ANSI mode is enabled in Spark config, Gluten will
> fallback to Vanilla Spark.
@@ -347,32 +350,45 @@ Using the following configuration options to customize
spilling:
| spark.gluten.sql.columnar.backend.velox.spillPartitionBits | 2
| The number of bits used to calculate the spilling partition
number. The number of spilling partitions will be power of two
|
| spark.gluten.sql.columnar.backend.velox.spillableReservationGrowthPct |
25 | The spillable memory reservation growth percentage of the
previous memory reservation size
|
| spark.gluten.sql.columnar.backend.velox.spillThreadNum | 0
| (Experimental) The thread num of a dedicated thread pool to do
spill
+
# Velox User-Defined Functions (UDF)
## Introduction
-Velox backend supports User-Defined Functions (UDF). Users can create their
own functions using the UDF interface provided in Velox backend and build
libraries for these functions. At runtime, the UDF are registered at the start
of applications. Once registered, Gluten will be able to parse and offload
these UDF into Velox during execution.
+Velox backend supports User-Defined Functions (UDF) and User-Defined Aggregate
Functions (UDAF).
+Users can create their own functions using the UDF interface provided in Velox
backend and build libraries for these functions.
+At runtime, the UDF are registered at the start of applications.
+Once registered, Gluten will be able to parse and offload these UDF into Velox
during execution.
-## Creating a UDF library
+## Create and Build UDF/UDAF library
The following steps demonstrate how to set up a UDF library project:
-- **Include the UDF Interface Header:** First, include the UDF interface
header file [Udf.h](../../cpp/velox/udf/Udf.h) in the project file. The header
file defines the `UdfEntry` struct, along with the macros for declaring the
necessary functions to integrate the UDF into Gluten and Velox.
+- **Include the UDF Interface Header:**
+ First, include the UDF interface header file
[Udf.h](../../cpp/velox/udf/Udf.h) in the project file.
+ The header file defines the `UdfEntry` struct, along with the macros for
declaring the necessary functions to integrate the UDF into Gluten and Velox.
-- **Implement the UDF:** Implement UDF. These functions should be able to
register to Velox.
+- **Implement the UDF:**
+ Implement UDF. These functions should be able to register to Velox.
-- **Implement the Interface Functions:** Implement the following interface
functions that integrate UDF into Project Gluten:
+- **Implement the Interface Functions:**
+ Implement the following interface functions that integrate UDF into Project
Gluten:
- - `getNumUdf()`: This function should return the number of UDF in the
library. This is used to allocating udfEntries array as the argument for the
next function `getUdfEntries`.
+ - `getNumUdf()`:
+ This function should return the number of UDF in the library.
+ This is used to allocating udfEntries array as the argument for the next
function `getUdfEntries`.
- - `getUdfEntries(gluten::UdfEntry* udfEntries)`: This function should
populate the provided udfEntries array with the details of the UDF, including
function names and signatures.
+ - `getUdfEntries(gluten::UdfEntry* udfEntries)`:
+ This function should populate the provided udfEntries array with the
details of the UDF, including function names and signatures.
- - `registerUdf()`: This function is called to register the UDF to Velox
function registry. This is where users should register functions by calling
`facebook::velox::exec::registerVecotorFunction` or other Velox APIs.
+ - `registerUdf()`:
+ This function is called to register the UDF to Velox function registry.
+ This is where users should register functions by calling
`facebook::velox::exec::registerVecotorFunction` or other Velox APIs.
- - The interface functions are mapping to marcos in
[Udf.h](../../cpp/velox/udf/Udf.h). Here's an example of how to implement these
functions:
+ - The interface functions are mapped to marcos in
[Udf.h](../../cpp/velox/udf/Udf.h). Here's an example of how to implement these
functions:
```
- // Filename MyUDF.cpp
+ // Filename MyUDF.cc
#include <velox/expression/VectorFunction.h>
#include <velox/udf/Udf.h>
@@ -409,9 +425,9 @@ The following steps demonstrate how to set up a UDF library
project:
```
-## Building the UDF library
+To build the UDF library, users need to compile the C++ code and link to
`libvelox.so`.
+It's recommended to create a CMakeLists.txt for the project. Here's an example:
-To build the UDF library, users need to compile the C++ code and link to
`libvelox.so`. It's recommended to create a CMakeLists.txt for the project.
Here's an example:
```
project(myudf)
@@ -428,13 +444,25 @@ target_include_directories(myudf PRIVATE
${GLUTEN_HOME}/cpp ${GLUTEN_HOME}/ep/bu
target_link_libraries(myudf PRIVATE ${VELOX_LIBRARY})
```
-## Using UDF in Gluten
+The steps for creating and building a UDAF library are quite similar to those
for a UDF library.
+The major difference lies in including and defining specific functions within
the UDAF header file [Udaf.h](../../cpp/velox/udf/Udaf.h)
-Gluten loads the UDF libraries at runtime. You can upload UDF libraries via
`--files` or `--archives`, and configure the libray paths using the provided
Spark configuration, which accepts comma separated list of library paths.
+- `getNumUdaf()`
+- `getUdafEntries(gluten::UdafEntry* udafEntries)`
+- `registerUdaf()`
-Note if running on Yarn client mode, the uploaded files are not reachable on
driver side. Users should copy those files to somewhere reachable for driver
and set `spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths`. This
configuration is also useful when the `udfLibraryPaths` is different between
driver side and executor side.
+`gluten::UdafEntry` requires an additional field `intermediateType`, to
specify the output type from partial aggregation.
+For detailed implementation, you can refer to the example code in
[MyUDAF.cc](../../cpp/velox/udf/examples/MyUDAF.cc)
+
+## Using UDF/UDAF in Gluten
+
+Gluten loads the UDF libraries at runtime. You can upload UDF libraries via
`--files` or `--archives`, and configure the library paths using the provided
Spark configuration, which accepts comma separated list of library paths.
+
+Note if running on Yarn client mode, the uploaded files are not reachable on
driver side. Users should copy those files to somewhere reachable for driver
and set `spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths`.
+This configuration is also useful when the `udfLibraryPaths` is different
between driver side and executor side.
- Use the `--files` option to upload a library and configure its relative path
+
```shell
--files /path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
--conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=libmyudf.so
@@ -443,6 +471,7 @@ Note if running on Yarn client mode, the uploaded files are
not reachable on dri
```
- Use the `--archives` option to upload an archive and configure its relative
path
+
```shell
--archives /path/to/udf_archives.zip#udf_archives
--conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=udf_archives
@@ -450,34 +479,42 @@ Note if running on Yarn client mode, the uploaded files
are not reachable on dri
--conf
spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths=file:///path/to/udf_archives.zip
```
-- Only configure URI
+- Configure URI
You can also specify the local or HDFS URIs to the UDF libraries or archives.
Local URIs should exist on driver and every worker nodes.
+
```shell
---conf
spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=hdfs://path/to/library_or_archive
+--conf
spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=file:///path/to/library_or_archive
```
## Try the example
-We provided an Velox UDF example file
[MyUDF.cpp](../../cpp/velox/udf/examples/MyUDF.cpp). After building gluten cpp,
you can find the example library at
/path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
+We provided Velox UDF examples in file
[MyUDF.cc](../../cpp/velox/udf/examples/MyUDF.cc) and UDAF examples in file
[MyUDAF.cc](../../cpp/velox/udf/examples/MyUDAF.cc).
+After building gluten cpp, you can find the example libraries at
/path/to/gluten/cpp/build/velox/udf/examples/
+
+Start spark-shell or spark-sql with below configuration
-Start spark-shell or spark-sql with below configuration
```shell
# Use the `--files` option to upload a library and configure its relative path
--files /path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
--conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=libmyudf.so
```
+
or
+
```shell
# Only configure URI
--conf
spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=file:///path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
```
Run query. The functions `myudf1` and `myudf2` increment the input value by a
constant of 5
+
```
select myudf1(1), myudf2(100L)
```
+
The output from spark-shell will be like
+
```
+----------------+------------------+
|udfexpression(1)|udfexpression(100)|
@@ -518,6 +555,7 @@ Gluten supports using Intel® QuickAssist Technology (QAT)
for data compression
This feature is based on QAT driver library and
[QATzip](https://github.com/intel/QATzip) library. Please manually download QAT
driver for your system, and follow its README to build and install on all
Driver and Worker node: [Intel® QuickAssist Technology Driver for Linux* – HW
Version
2.0](https://www.intel.com/content/www/us/en/download/765501/intel-quickassist-technology-driver-for-linux-hw-version-2-0.html?wapkw=quickassist).
## Software Requirements
+
- Download QAT driver for your system, and follow its README to build and
install on all Driver and Worker nodes: [Intel® QuickAssist Technology Driver
for Linux* – HW Version
2.0](https://www.intel.com/content/www/us/en/download/765501/intel-quickassist-technology-driver-for-linux-hw-version-2-0.html?wapkw=quickassist).
- Below compression libraries need to be installed on all Driver and Worker
nodes:
- Zlib* library of version 1.2.7 or higher
@@ -526,9 +564,7 @@ This feature is based on QAT driver library and
[QATzip](https://github.com/inte
## Build Gluten with QAT
-1. Setup ICP_ROOT environment variable to the directory where QAT driver is
extracted.
-This environment variable is required during building Gluten and running Spark
applications.
-It's recommended to put it in .bashrc on Driver and Worker nodes.
+1. Setup ICP_ROOT environment variable to the directory where QAT driver is
extracted. This environment variable is required during building Gluten and
running Spark applications. It's recommended to put it in .bashrc on Driver and
Worker nodes.
```bash
echo "export ICP_ROOT=/path/to/QAT_driver" >> ~/.bashrc
@@ -540,9 +576,8 @@ echo "export ICP_ROOT=/path/to/QAT_driver" >> ~/.bashrc
exit
```
-2. **This step is required if your application is running as Non-root user**.
-The users must be added to the 'qat' group after QAT drvier is installed.
-And change the amount of max locked memory for the username that is included
in the group name. This can be done by specifying the limit in
/etc/security/limits.conf.
+2. **This step is required if your application is running as Non-root user.**
+ The users must be added to the 'qat' group after QAT drvier is installed.
And change the amount of max locked memory for the username that is included in
the group name. This can be done by specifying the limit in
/etc/security/limits.conf.
```bash
sudo su -
@@ -554,9 +589,7 @@ echo "@qat - memlock 500000" >> /etc/security/limits.conf
exit
```
-3. Enable huge page. This step is required to execute each time after system
reboot. We recommend using systemctl to manage at system startup.
-You change the values for "max_huge_pages" and "max_huge_pages_per_process" to
make sure there are enough resources for your workload.
-As for Spark applications, one process matches one executor. Within the
executor, every task is allocated a maximum of 5 huge pages.
+3. Enable huge page. This step is required to execute each time after system
reboot. We recommend using systemctl to manage at system startup. You change
the values for "max_huge_pages" and "max_huge_pages_per_process" to make sure
there are enough resources for your workload. As for Spark applications, one
process matches one executor. Within the executor, every task is allocated a
maximum of 5 huge pages.
```bash
sudo su -
@@ -638,7 +671,7 @@ There is 8 QAT acceleration device(s) in the system:
--conf spark.gluten.sql.columnar.shuffle.codecBackend=qat
```
-4. You can use below command to check whether QAT is working normally at
run-time. The value of fw_counters should continue to increase during shuffle.
+4. You can use below command to check whether QAT is working normally at
run-time. The value of fw_counters should continue to increase during shuffle.
```
while :; do cat /sys/kernel/debug/qat_4xxx_0000:6b:00.0/fw_counters; sleep 1;
done
@@ -674,7 +707,8 @@ This feature is based on Intel®
[QPL](https://github.com/intel/qpl).
Gluten will internally build and link to a specific version of QPL library,
but extra environment setup is still required. Please refer to [QPL
Installation
Guide](https://intel.github.io/qpl/documentation/get_started_docs/installation.html)
to install dependencies and configure accelerators.
-**This step is required if your application is running as Non-root user**.
Create a group for the users who have privilege to use IAA, and grant group iaa
read/write access to the IAA Work-Queues.
+**This step is required if your application is running as Non-root user.**
+Create a group for the users who have privilege to use IAA, and grant group
iaa read/write access to the IAA Work-Queues.
```bash
sudo groupadd iaa
@@ -682,7 +716,7 @@ sudo usermod -aG iaa username # need to relogin
sudo chgrp -R iaa /dev/iax
sudo chmod -R g+rw /dev/iax
```
-
+
After the set-up, you can now build Gluten with QAT. Below command is used to
enable this feature
```bash
@@ -701,6 +735,7 @@ sudo ls -l /dev/iax
```
The output should be like:
+
```
total 0
crw-rw---- 1 root iaa 509, 0 Apr 5 18:54 wq1.0
@@ -732,7 +767,7 @@ Check out the [Intel® Query Processing Library (Intel® QPL)
Documentation](htt
# Test TPC-H or TPC-DS on Gluten with Velox backend
-All TPC-H and TPC-DS queries are supported in Gluten Velox backend.
+All TPC-H and TPC-DS queries are supported in Gluten Velox backend.
## Data preparation
@@ -743,6 +778,7 @@ The used TPC-H and TPC-DS queries are the original ones,
and can be accessed fro
and [TPC-H queries](../../gluten-core/src/test/resources/tpch-queries).
Some other versions of TPC-DS queries are also provided, but are **not**
recommended for testing, including:
+
- the modified TPC-DS queries with "Decimal-to-Double": [TPC-DS non-decimal
queries](../../gluten-core/src/test/resources/tpcds-queries/tpcds.queries.no-decimal)
(outdated).
## Submit the Spark SQL job
@@ -755,7 +791,7 @@ var parquet_file_path = "/PATH/TO/TPCH_PARQUET_PATH"
var gluten_root = "/PATH/TO/GLUTEN"
```
-Below script shows an example about how to run the testing, you should modify
the parameters such as executor cores, memory, offHeap size based on your
environment.
+Below script shows an example about how to run the testing, you should modify
the parameters such as executor cores, memory, offHeap size based on your
environment.
```bash
export GLUTEN_JAR = /PATH/TO/GLUTEN/package/target/<gluten-jar>
@@ -779,6 +815,7 @@ cat tpch_parquet.scala | spark-shell --name
tpch_powertest_velox \
Refer to [Gluten configuration](../Configuration.md) for more details.
## Result
+
*wholestagetransformer* indicates that the offload works.

@@ -908,7 +945,7 @@ ashProbe: Input: 9 rows (864B, 3 batches), Output: 27 rows
(3.56KB, 3 batches),
# Gluten Implicits
-Gluten provides a helper class to get the fallback summary from a Spark
Dataset.
+Gluten provides a helper class to get the fallback summary from a Spark
Dataset.
```
import org.apache.spark.sql.execution.GlutenImplicits._
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]