This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 5894e3679 [VL] Support native UDAF (#5130)
5894e3679 is described below

commit 5894e3679107400c9b155aafb6e5e5c11b6dff16
Author: Rong Ma <[email protected]>
AuthorDate: Mon Apr 1 22:49:33 2024 +0800

    [VL] Support native UDAF (#5130)
---
 backends-velox/pom.xml                             |   4 +-
 .../backendsapi/velox/SparkPlanExecApiImpl.scala   |   6 +-
 .../execution/HashAggregateExecTransformer.scala   |   3 +
 .../apache/spark/sql/expression/UDFResolver.scala  | 109 ++++++++++++-
 .../apache/gluten/expression/VeloxUdfSuite.scala   |  17 +-
 cpp/velox/jni/JniUdf.cc                            |  14 +-
 .../substrait/SubstraitToVeloxPlanValidator.cc     |   5 +-
 cpp/velox/udf/{examples/TestMyUDF.cc => Udaf.h}    |  48 +++---
 cpp/velox/udf/UdfLoader.cc                         | 118 +++++++++-----
 cpp/velox/udf/UdfLoader.h                          |  13 +-
 cpp/velox/udf/examples/CMakeLists.txt              |   7 +-
 cpp/velox/udf/examples/MyUDAF.cc                   | 181 +++++++++++++++++++++
 cpp/velox/udf/examples/{MyUDF.cpp => MyUDF.cc}     |  10 +-
 cpp/velox/udf/examples/TestMyUDF.cc                |  15 +-
 docs/get-started/Velox.md                          | 127 ++++++++++-----
 15 files changed, 541 insertions(+), 136 deletions(-)

diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml
index 6a598e8c0..70b8b901b 100755
--- a/backends-velox/pom.xml
+++ b/backends-velox/pom.xml
@@ -225,7 +225,9 @@
           <junitxml>.</junitxml>
           <tagsToExclude>${tagsToExclude}</tagsToExclude>
           <systemProperties>
-            
<velox.udf.lib.path>${cpp.build.dir}/velox/udf/examples/libmyudf.so</velox.udf.lib.path>
+            <velox.udf.lib.path>
+              
${cpp.build.dir}/velox/udf/examples/libmyudf.so,${cpp.build.dir}/velox/udf/examples/libmyudaf.so
+            </velox.udf.lib.path>
           </systemProperties>
         </configuration>
       </plugin>
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/SparkPlanExecApiImpl.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/SparkPlanExecApiImpl.scala
index 6f6286e5c..b723b865a 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/SparkPlanExecApiImpl.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/SparkPlanExecApiImpl.scala
@@ -50,7 +50,7 @@ import 
org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleEx
 import org.apache.spark.sql.execution.joins.BuildSideRelation
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.utils.ExecUtil
-import org.apache.spark.sql.expression.{UDFExpression, UDFResolver}
+import org.apache.spark.sql.expression.{UDFExpression, UDFResolver, 
UserDefinedAggregateFunction}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -644,7 +644,9 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi {
     Seq(
       Sig[HLLAdapter](ExpressionNames.APPROX_DISTINCT),
       Sig[UDFExpression](ExpressionNames.UDF_PLACEHOLDER),
-      Sig[NaNvl](ExpressionNames.NANVL))
+      Sig[UserDefinedAggregateFunction](ExpressionNames.UDF_PLACEHOLDER),
+      Sig[NaNvl](ExpressionNames.NANVL)
+    )
   }
 
   override def genInjectedFunctions()
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
index 2a0ac1ff9..d3f9bd78f 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
@@ -31,6 +31,7 @@ import org.apache.gluten.utils.VeloxIntermediateData
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.expression.UserDefinedAggregateFunction
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
@@ -710,6 +711,8 @@ object VeloxAggregateFunctionsBuilder {
         if (ignoreNulls) sigName = Some(ExpressionNames.FIRST_IGNORE_NULL)
       case Last(_, ignoreNulls) =>
         if (ignoreNulls) sigName = Some(ExpressionNames.LAST_IGNORE_NULL)
+      case UserDefinedAggregateFunction(name, _, _, _, _) =>
+        sigName = Some(name)
       case _ =>
     }
 
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
index e97450d96..6c8111350 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
@@ -27,10 +27,13 @@ import org.apache.gluten.vectorized.JniWorkspace
 import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFiles}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow}
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExpressionInfo}
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
 import org.apache.spark.util.Utils
 
 import com.google.common.collect.Lists
@@ -42,6 +45,33 @@ import java.nio.file.{Files, FileVisitOption, Paths}
 import scala.collection.JavaConverters.asScalaIteratorConverter
 import scala.collection.mutable
 
+case class UserDefinedAggregateFunction(
+    name: String,
+    dataType: DataType,
+    nullable: Boolean,
+    children: Seq[Expression],
+    override val aggBufferAttributes: Seq[AttributeReference])
+  extends AggregateFunction {
+
+  override def aggBufferSchema: StructType =
+    StructType(
+      aggBufferAttributes.map(a => StructField(a.name, a.dataType, a.nullable, 
a.metadata)))
+
+  override val inputAggBufferAttributes: Seq[AttributeReference] =
+    aggBufferAttributes.map(_.newInstance())
+
+  final override def eval(input: InternalRow = null): Any =
+    throw QueryExecutionErrors.cannotEvaluateExpressionError(this)
+
+  final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    throw QueryExecutionErrors.cannotGenerateCodeForExpressionError(this)
+
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[Expression]): Expression = {
+    this.copy(children = newChildren)
+  }
+}
+
 case class UDFExpression(
     name: String,
     dataType: DataType,
@@ -78,9 +108,14 @@ case class UDFExpression(
 
 object UDFResolver extends Logging {
   private val UDFNames = mutable.HashSet[String]()
-
+  // (udf_name, arg1, arg2, ...) => return type
   private val UDFMap = mutable.HashMap[(String, Seq[DataType]), 
ExpressionType]()
 
+  private val UDAFNames = mutable.HashSet[String]()
+  // (udaf_name, arg1, arg2, ...) => return type, intermediate attributes
+  private val UDAFMap =
+    mutable.HashMap[(String, Seq[DataType]), (ExpressionType, 
Seq[AttributeReference])]()
+
   private val LIB_EXTENSION = ".so"
 
   private lazy val isDriver: Boolean =
@@ -103,7 +138,41 @@ object UDFResolver extends Logging {
       (name, 
argTypes.dataType.asInstanceOf[StructType].fields.map(_.dataType)),
       returnType)
     UDFNames += name
-    logInfo(s"Registered UDF: $name -> $returnType")
+    logInfo(s"Registered UDF: $name($argTypes) -> $returnType")
+  }
+
+  def registerUDAF(
+      name: String,
+      returnType: Array[Byte],
+      argTypes: Array[Byte],
+      intermediateTypes: Array[Byte]): Unit = {
+    registerUDAF(
+      name,
+      ConverterUtils.parseFromBytes(returnType),
+      ConverterUtils.parseFromBytes(argTypes),
+      ConverterUtils.parseFromBytes(intermediateTypes)
+    )
+  }
+
+  private def registerUDAF(
+      name: String,
+      returnType: ExpressionType,
+      argTypes: ExpressionType,
+      intermediateTypes: ExpressionType): Unit = {
+    assert(argTypes.dataType.isInstanceOf[StructType])
+    assert(intermediateTypes.dataType.isInstanceOf[StructType])
+
+    val aggBufferAttributes =
+      
intermediateTypes.dataType.asInstanceOf[StructType].fields.zipWithIndex.map {
+        case (f, index) =>
+          AttributeReference(s"inter_$index", f.dataType, f.nullable)()
+      }
+    UDAFMap.put(
+      (name, 
argTypes.dataType.asInstanceOf[StructType].fields.map(_.dataType)),
+      (returnType, aggBufferAttributes)
+    )
+    UDAFNames += name
+    logInfo(s"Registered UDAF: $name($argTypes) -> $returnType")
   }
 
   def parseName(name: String): (String, String) = {
@@ -241,13 +310,41 @@ object UDFResolver extends Logging {
               new FunctionIdentifier(name),
               new ExpressionInfo(classOf[UDFExpression].getName, name),
               (e: Seq[Expression]) => getUdfExpression(name)(e))
+        }.toSeq ++ UDAFNames.map {
+          name =>
+            (
+              new FunctionIdentifier(name),
+              new 
ExpressionInfo(classOf[UserDefinedAggregateFunction].getName, name),
+              (e: Seq[Expression]) => getUdafExpression(name)(e))
         }.toSeq
     }
   }
 
   private def getUdfExpression(name: String)(children: Seq[Expression]) = {
     val expressionType =
-      UDFMap.getOrElse((name, children.map(_.dataType)), throw new 
IllegalStateException())
+      UDFMap.getOrElse(
+        (name, children.map(_.dataType)),
+        throw new UnsupportedOperationException(
+          s"UDF $name -> ${children.map(_.dataType.simpleString).mkString(", 
")} " +
+            s"is not registered.")
+      )
     UDFExpression(name, expressionType.dataType, expressionType.nullable, 
children)
   }
+
+  private def getUdafExpression(name: String)(children: Seq[Expression]) = {
+    val (expressionType, aggBufferAttributes) =
+      UDAFMap.getOrElse(
+        (name, children.map(_.dataType)),
+        throw new UnsupportedOperationException(
+          s"UDAF $name -> ${children.map(_.dataType.simpleString).mkString(", 
")} " +
+            s"is not registered.")
+      )
+
+    UserDefinedAggregateFunction(
+      name,
+      expressionType.dataType,
+      expressionType.nullable,
+      children,
+      aggBufferAttributes)
+  }
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala
index e12b3df2e..a1c50f610 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala
@@ -79,7 +79,22 @@ abstract class VeloxUdfSuite extends GlutenQueryTest with 
SQLHelper {
                          |  mydate(cast('2024-03-25' as date), 5)
                          |""".stripMargin)
     df.collect()
-    assert(df.collect().sameElements(Array(Row(6, 6L, 105, 
Date.valueOf("2024-03-30")))))
+    assert(
+      df.collect()
+        .sameElements(Array(Row(6, 6L, 105, Date.valueOf("2024-03-30")))))
+  }
+
+  test("test udaf") {
+    val df = spark.sql("""select
+                         |  myavg(1),
+                         |  myavg(1L),
+                         |  myavg(cast(1.0 as float)),
+                         |  myavg(cast(1.0 as double))
+                         |""".stripMargin)
+    df.collect()
+    assert(
+      df.collect()
+        .sameElements(Array(Row(1.0, 1.0, 1.0, 1.0))))
   }
 }
 
diff --git a/cpp/velox/jni/JniUdf.cc b/cpp/velox/jni/JniUdf.cc
index 7fa6c341e..cd5a4f7c8 100644
--- a/cpp/velox/jni/JniUdf.cc
+++ b/cpp/velox/jni/JniUdf.cc
@@ -28,6 +28,7 @@ const std::string kUdfResolverClassPath = 
"Lorg/apache/spark/sql/expression/UDFR
 
 static jclass udfResolverClass;
 static jmethodID registerUDFMethod;
+static jmethodID registerUDAFMethod;
 
 } // namespace
 
@@ -41,6 +42,7 @@ void gluten::initVeloxJniUDF(JNIEnv* env) {
 
   // methods
   registerUDFMethod = getMethodIdOrError(env, udfResolverClass, "registerUDF", 
"(Ljava/lang/String;[B[B)V");
+  registerUDAFMethod = getMethodIdOrError(env, udfResolverClass, 
"registerUDAF", "(Ljava/lang/String;[B[B[B)V");
 }
 
 void gluten::finalizeVeloxJniUDF(JNIEnv* env) {
@@ -61,7 +63,17 @@ void gluten::jniGetFunctionSignatures(JNIEnv* env) {
         argTypes, 0, signature->argTypes.length(), reinterpret_cast<const 
jbyte*>(signature->argTypes.c_str()));
     jobject instance = env->GetStaticObjectField(
         udfResolverClass, env->GetStaticFieldID(udfResolverClass, "MODULE$", 
kUdfResolverClassPath.c_str()));
-    env->CallVoidMethod(instance, registerUDFMethod, name, returnType, 
argTypes);
+    if (!signature->intermediateType.empty()) {
+      jbyteArray intermediateType = 
env->NewByteArray(signature->intermediateType.length());
+      env->SetByteArrayRegion(
+          intermediateType,
+          0,
+          signature->intermediateType.length(),
+          reinterpret_cast<const jbyte*>(signature->intermediateType.c_str()));
+      env->CallVoidMethod(instance, registerUDAFMethod, name, returnType, 
argTypes, intermediateType);
+    } else {
+      env->CallVoidMethod(instance, registerUDFMethod, name, returnType, 
argTypes);
+    }
     checkException(env);
   }
 }
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
index 0fd767f6a..8401e7d8a 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
@@ -20,6 +20,7 @@
 #include <re2/re2.h>
 #include <string>
 #include "TypeUtils.h"
+#include "udf/UdfLoader.h"
 #include "utils/Common.h"
 #include "velox/core/ExpressionEvaluator.h"
 #include "velox/exec/Aggregate.h"
@@ -1148,9 +1149,11 @@ bool SubstraitToVeloxPlanValidator::validate(const 
::substrait::AggregateRel& ag
       "skewness",
       "kurtosis"};
 
+  auto udfFuncs = UdfLoader::getInstance()->getRegisteredUdafNames();
+
   for (const auto& funcSpec : funcSpecs) {
     auto funcName = SubstraitParser::getNameBeforeDelimiter(funcSpec);
-    if (supportedAggFuncs.find(funcName) == supportedAggFuncs.end()) {
+    if (supportedAggFuncs.find(funcName) == supportedAggFuncs.end() && 
udfFuncs.find(funcName) == udfFuncs.end()) {
       LOG_VALIDATION_MSG(funcName + " was not supported in AggregateRel.");
       return false;
     }
diff --git a/cpp/velox/udf/examples/TestMyUDF.cc b/cpp/velox/udf/Udaf.h
similarity index 52%
copy from cpp/velox/udf/examples/TestMyUDF.cc
copy to cpp/velox/udf/Udaf.h
index e01c2f2c9..7e8f03402 100644
--- a/cpp/velox/udf/examples/TestMyUDF.cc
+++ b/cpp/velox/udf/Udaf.h
@@ -15,28 +15,26 @@
  * limitations under the License.
  */
 
-#include <iostream>
-#include "udf/UdfLoader.h"
-
-#include "velox/expression/VectorFunction.h"
-
-int main() {
-  auto udfLoader = gluten::UdfLoader::getInstance();
-  udfLoader->loadUdfLibraries("libmyudf.so");
-  udfLoader->registerUdf();
-
-  auto map = facebook::velox::exec::vectorFunctionFactories();
-  const std::string funcName = "myudf1";
-  auto f = map.withRLock([&funcName](auto& self) -> 
std::shared_ptr<facebook::velox::exec::VectorFunction> {
-    auto iter = self.find(funcName);
-    std::unordered_map<std::string, std::string> values;
-    const facebook::velox::core::QueryConfig config(std::move(values));
-    return iter->second.factory(funcName, {}, config);
-  });
-
-  if (!f) {
-    return 1;
-  }
-
-  return 0;
-}
+#pragma once
+
+namespace gluten {
+
+struct UdafEntry {
+  const char* name;
+  const char* dataType;
+
+  size_t numArgs;
+  const char** argTypes;
+
+  const char* intermediateType{nullptr};
+};
+
+#define GLUTEN_GET_NUM_UDAF getNumUdaf
+#define DEFINE_GET_NUM_UDAF extern "C" int GLUTEN_GET_NUM_UDAF()
+
+#define GLUTEN_GET_UDAF_ENTRIES getUdfEntries
+#define DEFINE_GET_UDAF_ENTRIES extern "C" void 
GLUTEN_GET_UDAF_ENTRIES(gluten::UdafEntry* udafEntries)
+
+#define GLUTEN_REGISTER_UDAF registerUdf
+#define DEFINE_REGISTER_UDAF extern "C" void GLUTEN_REGISTER_UDAF()
+} // namespace gluten
diff --git a/cpp/velox/udf/UdfLoader.cc b/cpp/velox/udf/UdfLoader.cc
index 3025b7388..a8a99ce9f 100644
--- a/cpp/velox/udf/UdfLoader.cc
+++ b/cpp/velox/udf/UdfLoader.cc
@@ -17,12 +17,12 @@
 
 #include <dlfcn.h>
 #include <google/protobuf/arena.h>
-#include <velox/expression/SignatureBinder.h>
-#include <velox/expression/VectorFunction.h>
-#include <velox/type/fbhive/HiveTypeParser.h>
-#include <functional>
 #include <vector>
+#include "velox/expression/SignatureBinder.h"
+#include "velox/expression/VectorFunction.h"
+#include "velox/type/fbhive/HiveTypeParser.h"
 
+#include "Udaf.h"
 #include "Udf.h"
 #include "UdfLoader.h"
 #include "utils/StringUtil.h"
@@ -31,9 +31,13 @@
 
 namespace {
 
-void* loadSymFromLibrary(void* handle, const std::string& libPath, const 
std::string& func) {
+void* loadSymFromLibrary(
+    void* handle,
+    const std::string& libPath,
+    const std::string& func,
+    bool throwIfNotFound = true) {
   void* sym = dlsym(handle, func.c_str());
-  if (!sym) {
+  if (!sym && throwIfNotFound) {
     throw gluten::GlutenException(func + " not found in " + libPath);
   }
   return sym;
@@ -59,29 +63,79 @@ void UdfLoader::loadUdfLibraries0(const 
std::vector<std::string>& libPaths) {
 }
 
 std::unordered_set<std::shared_ptr<UdfLoader::UdfSignature>> 
UdfLoader::getRegisteredUdfSignatures() {
-  std::unordered_set<std::shared_ptr<UdfSignature>> signatures;
+  if (!signatures_.empty()) {
+    return signatures_;
+  }
   for (const auto& item : handles_) {
     const auto& libPath = item.first;
     const auto& handle = item.second;
-    void* getNumUdfSym = loadSymFromLibrary(handle, libPath, 
GLUTEN_TOSTRING(GLUTEN_GET_NUM_UDF));
-    auto getNumUdf = reinterpret_cast<int (*)()>(getNumUdfSym);
-    // allocate
-    int numUdf = getNumUdf();
-    UdfEntry* udfEntry = static_cast<UdfEntry*>(malloc(sizeof(UdfEntry) * 
numUdf));
-
-    void* getUdfEntriesSym = loadSymFromLibrary(handle, libPath, 
GLUTEN_TOSTRING(GLUTEN_GET_UDF_ENTRIES));
-    auto getUdfEntries = reinterpret_cast<void 
(*)(UdfEntry*)>(getUdfEntriesSym);
-    getUdfEntries(udfEntry);
-
-    for (auto i = 0; i < numUdf; ++i) {
-      const auto& entry = udfEntry[i];
-      auto dataType = toSubstraitTypeStr(entry.dataType);
-      auto argTypes = toSubstraitTypeStr(entry.numArgs, entry.argTypes);
-      signatures.insert(std::make_shared<UdfSignature>(entry.name, dataType, 
argTypes));
+
+    // Handle UDFs.
+    void* getNumUdfSym = loadSymFromLibrary(handle, libPath, 
GLUTEN_TOSTRING(GLUTEN_GET_NUM_UDF), false);
+    if (getNumUdfSym) {
+      auto getNumUdf = reinterpret_cast<int (*)()>(getNumUdfSym);
+      int numUdf = getNumUdf();
+      // allocate
+      UdfEntry* udfEntries = static_cast<UdfEntry*>(malloc(sizeof(UdfEntry) * 
numUdf));
+
+      void* getUdfEntriesSym = loadSymFromLibrary(handle, libPath, 
GLUTEN_TOSTRING(GLUTEN_GET_UDF_ENTRIES));
+      auto getUdfEntries = reinterpret_cast<void 
(*)(UdfEntry*)>(getUdfEntriesSym);
+      getUdfEntries(udfEntries);
+
+      for (auto i = 0; i < numUdf; ++i) {
+        const auto& entry = udfEntries[i];
+        auto dataType = toSubstraitTypeStr(entry.dataType);
+        auto argTypes = toSubstraitTypeStr(entry.numArgs, entry.argTypes);
+        signatures_.insert(std::make_shared<UdfSignature>(entry.name, 
dataType, argTypes));
+      }
+      free(udfEntries);
+    } else {
+      LOG(INFO) << "No UDFs found in " << libPath;
+    }
+
+    // Handle UDAFs.
+    void* getNumUdafSym = loadSymFromLibrary(handle, libPath, 
GLUTEN_TOSTRING(GLUTEN_GET_NUM_UDAF), false);
+    if (getNumUdafSym) {
+      auto getNumUdaf = reinterpret_cast<int (*)()>(getNumUdafSym);
+      int numUdaf = getNumUdaf();
+      // allocate
+      UdafEntry* udafEntries = 
static_cast<UdafEntry*>(malloc(sizeof(UdafEntry) * numUdaf));
+
+      void* getUdafEntriesSym = loadSymFromLibrary(handle, libPath, 
GLUTEN_TOSTRING(GLUTEN_GET_UDAF_ENTRIES));
+      auto getUdafEntries = reinterpret_cast<void 
(*)(UdafEntry*)>(getUdafEntriesSym);
+      getUdafEntries(udafEntries);
+
+      for (auto i = 0; i < numUdaf; ++i) {
+        const auto& entry = udafEntries[i];
+        auto dataType = toSubstraitTypeStr(entry.dataType);
+        auto argTypes = toSubstraitTypeStr(entry.numArgs, entry.argTypes);
+        auto intermediateType = toSubstraitTypeStr(entry.intermediateType);
+        signatures_.insert(std::make_shared<UdfSignature>(entry.name, 
dataType, argTypes, intermediateType));
+      }
+      free(udafEntries);
+    } else {
+      LOG(INFO) << "No UDAFs found in " << libPath;
     }
-    free(udfEntry);
   }
-  return signatures;
+  return signatures_;
+}
+
+std::unordered_set<std::string> UdfLoader::getRegisteredUdafNames() {
+  if (handles_.empty()) {
+    return {};
+  }
+  if (!names_.empty()) {
+    return names_;
+  }
+  if (signatures_.empty()) {
+    getRegisteredUdfSignatures();
+  }
+  for (const auto& sig : signatures_) {
+    if (!sig->intermediateType.empty()) {
+      names_.insert(sig->name);
+    }
+  }
+  return names_;
 }
 
 void UdfLoader::registerUdf() {
@@ -92,23 +146,9 @@ void UdfLoader::registerUdf() {
   }
 }
 
-bool UdfLoader::validateUdf(const std::string& name, const 
std::vector<facebook::velox::TypePtr>& argTypes) {
-  const auto& functionMap = facebook::velox::exec::vectorFunctionFactories();
-  auto got = functionMap->find(name);
-  if (got != functionMap->end()) {
-    const auto& entry = got->second;
-    for (auto& signature : entry.signatures) {
-      facebook::velox::exec::SignatureBinder binder(*signature, argTypes);
-      if (binder.tryBind()) {
-        return true;
-      }
-    }
-  }
-  return false;
-}
-
 std::shared_ptr<UdfLoader> UdfLoader::getInstance() {
   static auto instance = std::make_shared<UdfLoader>();
   return instance;
 }
+
 } // namespace gluten
diff --git a/cpp/velox/udf/UdfLoader.h b/cpp/velox/udf/UdfLoader.h
index 8e2004853..31098d2f4 100644
--- a/cpp/velox/udf/UdfLoader.h
+++ b/cpp/velox/udf/UdfLoader.h
@@ -34,9 +34,14 @@ class UdfLoader {
     std::string returnType;
     std::string argTypes;
 
+    std::string intermediateType{};
+
     UdfSignature(std::string name, std::string returnType, std::string 
argTypes)
         : name(name), returnType(returnType), argTypes(argTypes) {}
 
+    UdfSignature(std::string name, std::string returnType, std::string 
argTypes, std::string intermediateType)
+        : name(name), returnType(returnType), argTypes(argTypes), 
intermediateType(intermediateType) {}
+
     ~UdfSignature() = default;
   };
 
@@ -46,10 +51,9 @@ class UdfLoader {
 
   std::unordered_set<std::shared_ptr<UdfSignature>> 
getRegisteredUdfSignatures();
 
-  void registerUdf();
+  std::unordered_set<std::string> getRegisteredUdafNames();
 
-  // unused
-  bool validateUdf(const std::string& name, const 
std::vector<facebook::velox::TypePtr>& argTypes);
+  void registerUdf();
 
  private:
   void loadUdfLibraries0(const std::vector<std::string>& libPaths);
@@ -81,5 +85,8 @@ class UdfLoader {
   facebook::velox::type::fbhive::HiveTypeParser parser_{};
   google::protobuf::Arena arena_{};
   VeloxToSubstraitTypeConvertor convertor_{};
+
+  std::unordered_set<std::shared_ptr<UdfSignature>> signatures_;
+  std::unordered_set<std::string> names_;
 };
 } // namespace gluten
diff --git a/cpp/velox/udf/examples/CMakeLists.txt 
b/cpp/velox/udf/examples/CMakeLists.txt
index 20fd9c54e..3c51859ee 100644
--- a/cpp/velox/udf/examples/CMakeLists.txt
+++ b/cpp/velox/udf/examples/CMakeLists.txt
@@ -13,8 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-add_library(myudf SHARED "MyUDF.cpp")
+add_library(myudf SHARED "MyUDF.cc")
 target_link_libraries(myudf velox)
 
+add_library(myudaf SHARED "MyUDAF.cc")
+target_link_libraries(myudaf velox)
+
 add_executable(test_myudf "TestMyUDF.cc")
-target_link_libraries(test_myudf myudf)
+target_link_libraries(test_myudf velox)
diff --git a/cpp/velox/udf/examples/MyUDAF.cc b/cpp/velox/udf/examples/MyUDAF.cc
new file mode 100644
index 000000000..e6c4b1fea
--- /dev/null
+++ b/cpp/velox/udf/examples/MyUDAF.cc
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <velox/exec/SimpleAggregateAdapter.h>
+#include <velox/expression/VectorFunction.h>
+#include <velox/functions/Macros.h>
+#include <velox/functions/Registerer.h>
+#include <velox/functions/lib/aggregates/AverageAggregateBase.h>
+#include <iostream>
+#include "udf/Udaf.h"
+
+using namespace facebook::velox;
+using namespace facebook::velox::exec;
+
+namespace {
+
+static const char* kInteger = "int";
+static const char* kBigInt = "bigint";
+static const char* kFloat = "float";
+static const char* kDouble = "double";
+
+// Copied from velox/exec/tests/SimpleAverageAggregate.cpp
+
+// Implementation of the average aggregation function through the
+// SimpleAggregateAdapter.
+template <typename T>
+class AverageAggregate {
+ public:
+  // Type(s) of input vector(s) wrapped in Row.
+  using InputType = Row<T>;
+
+  // Type of intermediate result vector wrapped in Row.
+  using IntermediateType =
+      Row</*sum*/ double,
+          /*count*/ int64_t>;
+
+  // Type of output vector.
+  using OutputType = std::conditional_t<std::is_same_v<T, float>, float, 
double>;
+
+  static bool toIntermediate(exec::out_type<Row<double, int64_t>>& out, 
exec::arg_type<T> in) {
+    out.copy_from(std::make_tuple(static_cast<double>(in), 1));
+    return true;
+  }
+
+  struct AccumulatorType {
+    double sum_;
+    int64_t count_;
+
+    AccumulatorType() = delete;
+
+    // Constructor used in initializeNewGroups().
+    explicit AccumulatorType(HashStringAllocator* /*allocator*/) {
+      sum_ = 0;
+      count_ = 0;
+    }
+
+    // addInput expects one parameter of exec::arg_type<T> for each child-type 
T
+    // wrapped in InputType.
+    void addInput(HashStringAllocator* /*allocator*/, exec::arg_type<T> data) {
+      sum_ += data;
+      count_ = checkedPlus<int64_t>(count_, 1);
+    }
+
+    // combine expects one parameter of exec::arg_type<IntermediateType>.
+    void combine(HashStringAllocator* /*allocator*/, 
exec::arg_type<Row<double, int64_t>> other) {
+      // Both field of an intermediate result should be non-null because
+      // writeIntermediateResult() never make an intermediate result with a
+      // single null.
+      VELOX_CHECK(other.at<0>().has_value());
+      VELOX_CHECK(other.at<1>().has_value());
+      sum_ += other.at<0>().value();
+      count_ = checkedPlus<int64_t>(count_, other.at<1>().value());
+    }
+
+    bool writeFinalResult(exec::out_type<OutputType>& out) {
+      out = sum_ / count_;
+      return true;
+    }
+
+    bool writeIntermediateResult(exec::out_type<IntermediateType>& out) {
+      out = std::make_tuple(sum_, count_);
+      return true;
+    }
+  };
+};
+
+exec::AggregateRegistrationResult registerSimpleAverageAggregate(const 
std::string& name) {
+  std::vector<std::shared_ptr<exec::AggregateFunctionSignature>> signatures;
+
+  for (const auto& inputType : {"smallint", "integer", "bigint", "double"}) {
+    signatures.push_back(exec::AggregateFunctionSignatureBuilder()
+                             .returnType("double")
+                             .intermediateType("row(double,bigint)")
+                             .argumentType(inputType)
+                             .build());
+  }
+
+  signatures.push_back(exec::AggregateFunctionSignatureBuilder()
+                           .returnType("real")
+                           .intermediateType("row(double,bigint)")
+                           .argumentType("real")
+                           .build());
+
+  return exec::registerAggregateFunction(
+      name,
+      std::move(signatures),
+      [name](
+          core::AggregationNode::Step step,
+          const std::vector<TypePtr>& argTypes,
+          const TypePtr& resultType,
+          const core::QueryConfig& /*config*/) -> 
std::unique_ptr<exec::Aggregate> {
+        VELOX_CHECK_LE(argTypes.size(), 1, "{} takes at most one argument", 
name);
+        auto inputType = argTypes[0];
+        if (exec::isRawInput(step)) {
+          switch (inputType->kind()) {
+            case TypeKind::SMALLINT:
+              return 
std::make_unique<SimpleAggregateAdapter<AverageAggregate<int16_t>>>(resultType);
+            case TypeKind::INTEGER:
+              return 
std::make_unique<SimpleAggregateAdapter<AverageAggregate<int32_t>>>(resultType);
+            case TypeKind::BIGINT:
+              return 
std::make_unique<SimpleAggregateAdapter<AverageAggregate<int64_t>>>(resultType);
+            case TypeKind::REAL:
+              return 
std::make_unique<SimpleAggregateAdapter<AverageAggregate<float>>>(resultType);
+            case TypeKind::DOUBLE:
+              return 
std::make_unique<SimpleAggregateAdapter<AverageAggregate<double>>>(resultType);
+            default:
+              VELOX_FAIL("Unknown input type for {} aggregation {}", name, 
inputType->kindName());
+          }
+        } else {
+          switch (resultType->kind()) {
+            case TypeKind::REAL:
+              return 
std::make_unique<SimpleAggregateAdapter<AverageAggregate<float>>>(resultType);
+            case TypeKind::DOUBLE:
+            case TypeKind::ROW:
+              return 
std::make_unique<SimpleAggregateAdapter<AverageAggregate<double>>>(resultType);
+            default:
+              VELOX_FAIL("Unsupported result type for final aggregation: {}", 
resultType->kindName());
+          }
+        }
+      },
+      true /*registerCompanionFunctions*/,
+      true /*overwrite*/);
+}
+} // namespace
+
+const int kNumMyUdaf = 4;
+
+DEFINE_GET_NUM_UDAF {
+  return kNumMyUdaf;
+}
+
+const char* myAvgArg1[] = {kInteger};
+const char* myAvgArg2[] = {kBigInt};
+const char* myAvgArg3[] = {kFloat};
+const char* myAvgArg4[] = {kDouble};
+const char* myAvgIntermediateType = "struct<a:double,b:bigint>";
+DEFINE_GET_UDAF_ENTRIES {
+  int index = 0;
+  udafEntries[index++] = {"myavg", kDouble, 1, myAvgArg1, 
myAvgIntermediateType};
+  udafEntries[index++] = {"myavg", kDouble, 1, myAvgArg2, 
myAvgIntermediateType};
+  udafEntries[index++] = {"myavg", kDouble, 1, myAvgArg3, 
myAvgIntermediateType};
+  udafEntries[index++] = {"myavg", kDouble, 1, myAvgArg4, 
myAvgIntermediateType};
+}
+
+DEFINE_REGISTER_UDAF {
+  registerSimpleAverageAggregate("myavg");
+}
diff --git a/cpp/velox/udf/examples/MyUDF.cpp b/cpp/velox/udf/examples/MyUDF.cc
similarity index 93%
rename from cpp/velox/udf/examples/MyUDF.cpp
rename to cpp/velox/udf/examples/MyUDF.cc
index 5115a2d12..578e3effb 100644
--- a/cpp/velox/udf/examples/MyUDF.cpp
+++ b/cpp/velox/udf/examples/MyUDF.cc
@@ -24,6 +24,7 @@
 namespace {
 
 using namespace facebook::velox;
+using namespace facebook::velox::exec;
 
 static const char* kInteger = "int";
 static const char* kBigInt = "bigint";
@@ -118,10 +119,11 @@ const char* myUdf1Arg2[] = {kBigInt};
 const char* myUdf2Arg1[] = {kBigInt};
 const char* myDateArg[] = {kDate, kInteger};
 DEFINE_GET_UDF_ENTRIES {
-  udfEntries[0] = {"myudf1", kInteger, 1, myUdf1Arg1};
-  udfEntries[1] = {"myudf1", kBigInt, 1, myUdf1Arg2};
-  udfEntries[2] = {"myudf2", kBigInt, 1, myUdf2Arg1};
-  udfEntries[3] = {"mydate", kDate, 2, myDateArg};
+  int index = 0;
+  udfEntries[index++] = {"myudf1", kInteger, 1, myUdf1Arg1};
+  udfEntries[index++] = {"myudf1", kBigInt, 1, myUdf1Arg2};
+  udfEntries[index++] = {"myudf2", kBigInt, 1, myUdf2Arg1};
+  udfEntries[index++] = {"mydate", kDate, 2, myDateArg};
 }
 
 DEFINE_REGISTER_UDF {
diff --git a/cpp/velox/udf/examples/TestMyUDF.cc 
b/cpp/velox/udf/examples/TestMyUDF.cc
index e01c2f2c9..794dd0613 100644
--- a/cpp/velox/udf/examples/TestMyUDF.cc
+++ b/cpp/velox/udf/examples/TestMyUDF.cc
@@ -26,12 +26,15 @@ int main() {
   udfLoader->registerUdf();
 
   auto map = facebook::velox::exec::vectorFunctionFactories();
-  const std::string funcName = "myudf1";
-  auto f = map.withRLock([&funcName](auto& self) -> 
std::shared_ptr<facebook::velox::exec::VectorFunction> {
-    auto iter = self.find(funcName);
-    std::unordered_map<std::string, std::string> values;
-    const facebook::velox::core::QueryConfig config(std::move(values));
-    return iter->second.factory(funcName, {}, config);
+  const std::vector<std::string> candidates = {"myudf1", "myudf2"};
+  auto f = map.withRLock([&candidates](auto& self) -> bool {
+    return std::all_of(candidates.begin(), candidates.end(), [&](const auto& 
funcName) {
+      auto iter = self.find(funcName);
+      std::unordered_map<std::string, std::string> values;
+      const facebook::velox::core::QueryConfig config(std::move(values));
+      return iter->second.factory(
+                 funcName, 
{facebook::velox::exec::VectorFunctionArg{facebook::velox::BIGINT()}}, config) 
!= nullptr;
+    });
   });
 
   if (!f) {
diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md
index df0c6ec05..156c94694 100644
--- a/docs/get-started/Velox.md
+++ b/docs/get-started/Velox.md
@@ -4,6 +4,7 @@ title: Gluten with Velox Backend
 nav_order: 1
 parent: Getting-Started
 ---
+
 # Supported Version
 
 | Type  | Version                         |
@@ -15,10 +16,10 @@ parent: Getting-Started
 
 # Prerequisite
 
-Currently, Gluten+Velox backend is only tested on 
**Ubuntu20.04/Ubuntu22.04/Centos7/Centos8**. Other kinds of OS support are 
still in progress. The long term goal is to support several
-common OS and conda env deployment.
+Currently, Gluten+Velox backend is only tested on 
**Ubuntu20.04/Ubuntu22.04/Centos7/Centos8**.
+Other kinds of OS support are still in progress. The long term goal is to 
support several common OS and conda env deployment.
 
-Gluten only fully tested in CI with 3.2.2, 3.3.1 and 3.4.2. We will add/update 
supported/tested versions according to the upstream changes. 
+Gluten only fully tested in CI with 3.2.2, 3.3.1 and 3.4.2. We will add/update 
supported/tested versions according to the upstream changes.
 
 We need to set up the `JAVA_HOME` env. Currently, Gluten supports **java 8** 
and **java 17**.
 
@@ -120,6 +121,7 @@ With config `enable_vcpkg=OFF`, the dependency libraries 
won't be statically lin
 Hadoop hdfs support is ready via the 
[libhdfs3](https://github.com/apache/hawq/tree/master/depends/libhdfs3) 
library. The libhdfs3 provides native API for Hadoop I/O without the drawbacks 
of JNI. It also provides advanced authentication like Kerberos based. Please 
note this library has several dependencies which may require extra 
installations on Driver and Worker node.
 
 ### Build with HDFS support
+
 To build Gluten with HDFS support, below command is suggested:
 
 ```bash
@@ -128,7 +130,7 @@ cd /path/to/gluten
 ```
 
 ### Configuration about HDFS support
- 
+
 HDFS uris (hdfs://host:port) will be extracted from a valid hdfs file path to 
initialize hdfs client, you do not need to specify it explicitly.
 
 libhdfs3 need a configuration file and [example 
here](https://github.com/apache/hawq/blob/e9d43144f7e947e071bba48871af9da354d177d0/src/backend/utils/misc/etc/hdfs-client.xml),
 this file is a bit different from hdfs-site.xml and core-site.xml.
@@ -159,12 +161,12 @@ You also need to add configuration to the "hdfs-site.xml" 
as below:
 
 ```
 <property>
-   <name>dfs.client.read.shortcircuit</name>
-   <value>true</value>
+  <name>dfs.client.read.shortcircuit</name>
+  <value>true</value>
 </property>
 <property>
-   <name>dfs.domain.socket.path</name>
-   <value>/var/lib/hadoop-hdfs/dn_socket</value>
+  <name>dfs.domain.socket.path</name>
+  <value>/var/lib/hadoop-hdfs/dn_socket</value>
 </property>
 ```
 
@@ -174,10 +176,10 @@ Here are two steps to enable kerberos.
 
 - Make sure the hdfs-client.xml contains
 
-```xml
+```
 <property>
-    <name>hadoop.security.authentication</name>
-    <value>kerberos</value>
+  <name>hadoop.security.authentication</name>
+  <value>kerberos</value>
 </property>
 ```
 
@@ -290,6 +292,7 @@ The gluten-iceberg jar is in `gluten-iceberg/target` 
directory.
 After the two steps, you can query iceberg table by gluten/velox without 
scan's fallback.
 
 # Coverage
+
 Spark3.3 has 387 functions in total. ~240 are commonly used. Velox's functions 
have two category, Presto and Spark. Presto has 124 functions implemented. 
Spark has 62 functions. Spark functions are verified to have the same result as 
Vanilla Spark. Some Presto functions have the same result as Vanilla Spark but 
some others have different. Gluten prefer to use Spark functions firstly. If 
it's not in Spark's list but implemented in Presto, we currently offload to 
Presto one until we noted  [...]
 
 > Velox doesn't support [ANSI 
 > mode](https://spark.apache.org/docs/latest/sql-ref-ansi-compliance.html)), 
 > so as Gluten. Once ANSI mode is enabled in Spark config, Gluten will 
 > fallback to Vanilla Spark.
@@ -347,32 +350,45 @@ Using the following configuration options to customize 
spilling:
 | spark.gluten.sql.columnar.backend.velox.spillPartitionBits               | 2 
            | The number of bits used to calculate the spilling partition 
number. The number of spilling partitions will be power of two                  
                                      |
 | spark.gluten.sql.columnar.backend.velox.spillableReservationGrowthPct    | 
25            | The spillable memory reservation growth percentage of the 
previous memory reservation size                                                
                                        |
 | spark.gluten.sql.columnar.backend.velox.spillThreadNum                   | 0 
            | (Experimental) The thread num of a dedicated thread pool to do 
spill
+
 # Velox User-Defined Functions (UDF)
 
 ## Introduction
 
-Velox backend supports User-Defined Functions (UDF). Users can create their 
own functions using the UDF interface provided in Velox backend and build 
libraries for these functions. At runtime, the UDF are registered at the start 
of applications. Once registered, Gluten will be able to parse and offload 
these UDF into Velox during execution.
+Velox backend supports User-Defined Functions (UDF) and User-Defined Aggregate 
Functions (UDAF).
+Users can create their own functions using the UDF interface provided in Velox 
backend and build libraries for these functions.
+At runtime, the UDF are registered at the start of applications.
+Once registered, Gluten will be able to parse and offload these UDF into Velox 
during execution.
 
-## Creating a UDF library
+## Create and Build UDF/UDAF library
 
 The following steps demonstrate how to set up a UDF library project:
 
-- **Include the UDF Interface Header:** First, include the UDF interface 
header file [Udf.h](../../cpp/velox/udf/Udf.h) in the project file. The header 
file defines the `UdfEntry` struct, along with the macros for declaring the 
necessary functions to integrate the UDF into Gluten and Velox.
+- **Include the UDF Interface Header:**
+  First, include the UDF interface header file 
[Udf.h](../../cpp/velox/udf/Udf.h) in the project file.
+  The header file defines the `UdfEntry` struct, along with the macros for 
declaring the necessary functions to integrate the UDF into Gluten and Velox.
 
-- **Implement the UDF:** Implement UDF. These functions should be able to 
register to Velox.
+- **Implement the UDF:**
+  Implement UDF. These functions should be able to register to Velox.
 
-- **Implement the Interface Functions:** Implement the following interface 
functions that integrate UDF into Project Gluten:
+- **Implement the Interface Functions:**
+  Implement the following interface functions that integrate UDF into Project 
Gluten:
 
-  - `getNumUdf()`: This function should return the number of UDF in the 
library. This is used to allocating udfEntries array as the argument for the 
next function `getUdfEntries`.
+  - `getNumUdf()`:
+    This function should return the number of UDF in the library.
+    This is used to allocating udfEntries array as the argument for the next 
function `getUdfEntries`.
 
-  - `getUdfEntries(gluten::UdfEntry* udfEntries)`: This function should 
populate the provided udfEntries array with the details of the UDF, including 
function names and signatures.
+  - `getUdfEntries(gluten::UdfEntry* udfEntries)`:
+    This function should populate the provided udfEntries array with the 
details of the UDF, including function names and signatures.
 
-  - `registerUdf()`: This function is called to register the UDF to Velox 
function registry. This is where users should register functions by calling 
`facebook::velox::exec::registerVecotorFunction` or other Velox APIs.
+  - `registerUdf()`:
+    This function is called to register the UDF to Velox function registry.
+    This is where users should register functions by calling 
`facebook::velox::exec::registerVecotorFunction` or other Velox APIs.
 
-  - The interface functions are mapping to marcos in 
[Udf.h](../../cpp/velox/udf/Udf.h). Here's an example of how to implement these 
functions:
+  - The interface functions are mapped to marcos in 
[Udf.h](../../cpp/velox/udf/Udf.h). Here's an example of how to implement these 
functions:
 
   ```
-  // Filename MyUDF.cpp
+  // Filename MyUDF.cc
 
   #include <velox/expression/VectorFunction.h>
   #include <velox/udf/Udf.h>
@@ -409,9 +425,9 @@ The following steps demonstrate how to set up a UDF library 
project:
 
   ```
 
-## Building the UDF library
+To build the UDF library, users need to compile the C++ code and link to 
`libvelox.so`.
+It's recommended to create a CMakeLists.txt for the project. Here's an example:
 
-To build the UDF library, users need to compile the C++ code and link to 
`libvelox.so`. It's recommended to create a CMakeLists.txt for the project. 
Here's an example:
 ```
 project(myudf)
 
@@ -428,13 +444,25 @@ target_include_directories(myudf PRIVATE 
${GLUTEN_HOME}/cpp ${GLUTEN_HOME}/ep/bu
 target_link_libraries(myudf PRIVATE ${VELOX_LIBRARY})
 ```
 
-## Using UDF in Gluten
+The steps for creating and building a UDAF library are quite similar to those 
for a UDF library.
+The major difference lies in including and defining specific functions within 
the UDAF header file [Udaf.h](../../cpp/velox/udf/Udaf.h)
 
-Gluten loads the UDF libraries at runtime. You can upload UDF libraries via 
`--files` or `--archives`, and configure the libray paths using the provided 
Spark configuration, which accepts comma separated list of library paths.
+- `getNumUdaf()`
+- `getUdafEntries(gluten::UdafEntry* udafEntries)`
+- `registerUdaf()`
 
-Note if running on Yarn client mode, the uploaded files are not reachable on 
driver side. Users should copy those files to somewhere reachable for driver 
and set `spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths`. This 
configuration is also useful when the `udfLibraryPaths` is different between 
driver side and executor side.
+`gluten::UdafEntry` requires an additional field `intermediateType`, to 
specify the output type from partial aggregation.
+For detailed implementation, you can refer to the example code in 
[MyUDAF.cc](../../cpp/velox/udf/examples/MyUDAF.cc)
+
+## Using UDF/UDAF in Gluten
+
+Gluten loads the UDF libraries at runtime. You can upload UDF libraries via 
`--files` or `--archives`, and configure the library paths using the provided 
Spark configuration, which accepts comma separated list of library paths.
+
+Note if running on Yarn client mode, the uploaded files are not reachable on 
driver side. Users should copy those files to somewhere reachable for driver 
and set `spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths`.
+This configuration is also useful when the `udfLibraryPaths` is different 
between driver side and executor side.
 
 - Use the `--files` option to upload a library and configure its relative path
+
 ```shell
 --files /path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
 --conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=libmyudf.so
@@ -443,6 +471,7 @@ Note if running on Yarn client mode, the uploaded files are 
not reachable on dri
 ```
 
 - Use the `--archives` option to upload an archive and configure its relative 
path
+
 ```shell
 --archives /path/to/udf_archives.zip#udf_archives
 --conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=udf_archives
@@ -450,34 +479,42 @@ Note if running on Yarn client mode, the uploaded files 
are not reachable on dri
 --conf 
spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths=file:///path/to/udf_archives.zip
 ```
 
-- Only configure URI
+- Configure URI
 
 You can also specify the local or HDFS URIs to the UDF libraries or archives. 
Local URIs should exist on driver and every worker nodes.
+
 ```shell
---conf 
spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=hdfs://path/to/library_or_archive
+--conf 
spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=file:///path/to/library_or_archive
 ```
 
 ## Try the example
 
-We provided an Velox UDF example file 
[MyUDF.cpp](../../cpp/velox/udf/examples/MyUDF.cpp). After building gluten cpp, 
you can find the example library at 
/path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
+We provided Velox UDF examples in file 
[MyUDF.cc](../../cpp/velox/udf/examples/MyUDF.cc) and UDAF examples in file 
[MyUDAF.cc](../../cpp/velox/udf/examples/MyUDAF.cc).
+After building gluten cpp, you can find the example libraries at 
/path/to/gluten/cpp/build/velox/udf/examples/
+
+Start spark-shell or spark-sql with below configuration
 
-Start spark-shell or spark-sql with below configuration 
 ```shell
 # Use the `--files` option to upload a library and configure its relative path
 --files /path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
 --conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=libmyudf.so
 ```
+
 or
+
 ```shell
 # Only configure URI
 --conf 
spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=file:///path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
 ```
 
 Run query. The functions `myudf1` and `myudf2` increment the input value by a 
constant of 5
+
 ```
 select myudf1(1), myudf2(100L)
 ```
+
 The output from spark-shell will be like
+
 ```
 +----------------+------------------+
 |udfexpression(1)|udfexpression(100)|
@@ -518,6 +555,7 @@ Gluten supports using Intel® QuickAssist Technology (QAT) 
for data compression
 This feature is based on QAT driver library and 
[QATzip](https://github.com/intel/QATzip) library. Please manually download QAT 
driver for your system, and follow its README to build and install on all 
Driver and Worker node: [Intel® QuickAssist Technology Driver for Linux* – HW 
Version 
2.0](https://www.intel.com/content/www/us/en/download/765501/intel-quickassist-technology-driver-for-linux-hw-version-2-0.html?wapkw=quickassist).
 
 ## Software Requirements
+
 - Download QAT driver for your system, and follow its README to build and 
install on all Driver and Worker nodes: [Intel® QuickAssist Technology Driver 
for Linux* – HW Version 
2.0](https://www.intel.com/content/www/us/en/download/765501/intel-quickassist-technology-driver-for-linux-hw-version-2-0.html?wapkw=quickassist).
 - Below compression libraries need to be installed on all Driver and Worker 
nodes:
   - Zlib* library of version 1.2.7 or higher
@@ -526,9 +564,7 @@ This feature is based on QAT driver library and 
[QATzip](https://github.com/inte
 
 ## Build Gluten with QAT
 
-1. Setup ICP_ROOT environment variable to the directory where QAT driver is 
extracted.
-This environment variable is required during building Gluten and running Spark 
applications.
-It's recommended to put it in .bashrc on Driver and Worker nodes.
+1. Setup ICP_ROOT environment variable to the directory where QAT driver is 
extracted. This environment variable is required during building Gluten and 
running Spark applications. It's recommended to put it in .bashrc on Driver and 
Worker nodes.
 
 ```bash
 echo "export ICP_ROOT=/path/to/QAT_driver" >> ~/.bashrc
@@ -540,9 +576,8 @@ echo "export ICP_ROOT=/path/to/QAT_driver" >> ~/.bashrc
 exit
 ```
 
-2. **This step is required if your application is running as Non-root user**.
-The users must be added to the 'qat' group after QAT drvier is installed.
-And change the amount of max locked memory for the username that is included 
in the group name. This can be done by specifying the limit in 
/etc/security/limits.conf.
+2. **This step is required if your application is running as Non-root user.**
+   The users must be added to the 'qat' group after QAT drvier is installed. 
And change the amount of max locked memory for the username that is included in 
the group name. This can be done by specifying the limit in 
/etc/security/limits.conf.
 
 ```bash
 sudo su -
@@ -554,9 +589,7 @@ echo "@qat - memlock 500000" >> /etc/security/limits.conf
 exit
 ```
 
-3. Enable huge page. This step is required to execute each time after system 
reboot. We recommend using systemctl to manage at system startup.
-You change the values for "max_huge_pages" and "max_huge_pages_per_process" to 
make sure there are enough resources for your workload.
-As for Spark applications, one process matches one executor. Within the 
executor, every task is allocated a maximum of 5 huge pages.
+3. Enable huge page. This step is required to execute each time after system 
reboot. We recommend using systemctl to manage at system startup. You change 
the values for "max_huge_pages" and "max_huge_pages_per_process" to make sure 
there are enough resources for your workload. As for Spark applications, one 
process matches one executor. Within the executor, every task is allocated a 
maximum of 5 huge pages.
 
 ```bash
 sudo su -
@@ -638,7 +671,7 @@ There is 8 QAT acceleration device(s) in the system:
 --conf spark.gluten.sql.columnar.shuffle.codecBackend=qat
 ```
 
-4. You can use below command to check whether QAT is working normally at 
run-time. The value of fw_counters should continue to increase during shuffle. 
+4. You can use below command to check whether QAT is working normally at 
run-time. The value of fw_counters should continue to increase during shuffle.
 
 ```
 while :; do cat /sys/kernel/debug/qat_4xxx_0000:6b:00.0/fw_counters; sleep 1; 
done
@@ -674,7 +707,8 @@ This feature is based on Intel® 
[QPL](https://github.com/intel/qpl).
 
 Gluten will internally build and link to a specific version of QPL library, 
but extra environment setup is still required. Please refer to [QPL 
Installation 
Guide](https://intel.github.io/qpl/documentation/get_started_docs/installation.html)
 to install dependencies and configure accelerators.
 
-**This step is required if your application is running as Non-root user**. 
Create a group for the users who have privilege to use IAA, and grant group iaa 
read/write access to the IAA Work-Queues.
+**This step is required if your application is running as Non-root user.**
+Create a group for the users who have privilege to use IAA, and grant group 
iaa read/write access to the IAA Work-Queues.
 
 ```bash
 sudo groupadd iaa
@@ -682,7 +716,7 @@ sudo usermod -aG iaa username # need to relogin
 sudo chgrp -R iaa /dev/iax
 sudo chmod -R g+rw /dev/iax
 ```
- 
+
 After the set-up, you can now build Gluten with QAT. Below command is used to 
enable this feature
 
 ```bash
@@ -701,6 +735,7 @@ sudo ls -l /dev/iax
 ```
 
 The output should be like:
+
 ```
 total 0
 crw-rw---- 1 root iaa 509, 0 Apr  5 18:54 wq1.0
@@ -732,7 +767,7 @@ Check out the [Intel® Query Processing Library (Intel® QPL) 
Documentation](htt
 
 # Test TPC-H or TPC-DS on Gluten with Velox backend
 
-All TPC-H and TPC-DS queries are supported in Gluten Velox backend.  
+All TPC-H and TPC-DS queries are supported in Gluten Velox backend.
 
 ## Data preparation
 
@@ -743,6 +778,7 @@ The used TPC-H and TPC-DS queries are the original ones, 
and can be accessed fro
 and [TPC-H queries](../../gluten-core/src/test/resources/tpch-queries).
 
 Some other versions of TPC-DS queries are also provided, but are **not** 
recommended for testing, including:
+
 - the modified TPC-DS queries with "Decimal-to-Double": [TPC-DS non-decimal 
queries](../../gluten-core/src/test/resources/tpcds-queries/tpcds.queries.no-decimal)
 (outdated).
 
 ## Submit the Spark SQL job
@@ -755,7 +791,7 @@ var parquet_file_path = "/PATH/TO/TPCH_PARQUET_PATH"
 var gluten_root = "/PATH/TO/GLUTEN"
 ```
 
-Below script shows an example about how to run the testing, you should modify 
the parameters such as executor cores, memory, offHeap size based on your 
environment. 
+Below script shows an example about how to run the testing, you should modify 
the parameters such as executor cores, memory, offHeap size based on your 
environment.
 
 ```bash
 export GLUTEN_JAR = /PATH/TO/GLUTEN/package/target/<gluten-jar>
@@ -779,6 +815,7 @@ cat tpch_parquet.scala | spark-shell --name 
tpch_powertest_velox \
 Refer to [Gluten configuration](../Configuration.md) for more details.
 
 ## Result
+
 *wholestagetransformer* indicates that the offload works.
 
 ![TPC-H Q6](../image/TPC-H_Q6_DAG.png)
@@ -908,7 +945,7 @@ ashProbe: Input: 9 rows (864B, 3 batches), Output: 27 rows 
(3.56KB, 3 batches),
 
 # Gluten Implicits
 
-Gluten provides a helper class to get the fallback summary from a Spark 
Dataset. 
+Gluten provides a helper class to get the fallback summary from a Spark 
Dataset.
 
 ```
 import org.apache.spark.sql.execution.GlutenImplicits._


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to