This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new fa5c64b9 [AURON #1374] Introduce AuronUDFWrapperContext (#1375)
fa5c64b9 is described below
commit fa5c64b99f0633fddb35c957df62c4092f5bf2cd
Author: zhangmang <[email protected]>
AuthorDate: Mon Sep 29 16:12:23 2025 +0800
[AURON #1374] Introduce AuronUDFWrapperContext (#1375)
* [AURON #1374] Introduce AuronUDFWrapperContext
* fix compile
* fix checkstyle
* add test regexp_extract
* fix checkstyle
---
.../auron/functions/AuronUDFWrapperContext.java | 47 ++++++++++++++++++++++
.../java/org/apache/auron/jni/AuronAdaptor.java | 11 +++++
.../main/java/org/apache/auron/jni/JniBridge.java | 6 +++
.../MockAuronUDFWrapperContext.java} | 24 ++++++-----
.../org/apache/auron/jni/MockAuronAdaptor.java | 8 ++++
native-engine/auron-jni-bridge/src/jni_bridge.rs | 47 ++++++++++++++++++----
.../datafusion-ext-exprs/src/spark_udf_wrapper.rs | 4 +-
.../spark/sql/auron/AuronFunctionSuite.scala | 9 +++++
spark-extension/pom.xml | 5 +++
.../java/org/apache/spark/sql/auron/JniBridge.java | 6 +++
.../spark/sql/SparkAuronUDFWrapperContext.scala} | 22 +++++-----
11 files changed, 159 insertions(+), 30 deletions(-)
diff --git
a/auron-core/src/main/java/org/apache/auron/functions/AuronUDFWrapperContext.java
b/auron-core/src/main/java/org/apache/auron/functions/AuronUDFWrapperContext.java
new file mode 100644
index 00000000..110b4fcc
--- /dev/null
+++
b/auron-core/src/main/java/org/apache/auron/functions/AuronUDFWrapperContext.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.functions;
+
+/**
+ * Wrapper context for user-defined functions (UDFs).
+ * This class bridges different engines and native UDF implementations.
+ * SQL engines such as Spark and Flink should provide their respective
implementations based on this.
+ */
+public interface AuronUDFWrapperContext {
+
+ /**
+ * Opens the UDF context with the given resource ID.
+ * The Flink engine requires the FunctionContext, which can be obtained
via the ResourceID, to initialize the Flink ScalarFunction.
+ * @param resourceId
+ */
+ default void open(String resourceId) {}
+
+ /**
+ * Evaluates the UDF with the provided input and output pointers.
+ * This method is called for each invocation of the UDF during query
execution.
+ *
+ * @param inputPtr Native pointer to the input data
+ * @param outputPtr Native pointer to the output location where results
should be written
+ */
+ void eval(long inputPtr, long outputPtr);
+
+ /**
+ * Closes the UDF context.
+ * Some UDFs may need to perform resource cleanup operations.
+ */
+ default void close() {}
+}
diff --git a/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java
b/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java
index 4c1d27b8..71a6349e 100644
--- a/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java
+++ b/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java
@@ -18,7 +18,9 @@ package org.apache.auron.jni;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.auron.configuration.AuronConfiguration;
+import org.apache.auron.functions.AuronUDFWrapperContext;
import org.apache.auron.memory.OnHeapSpillManager;
/**
@@ -119,4 +121,13 @@ public abstract class AuronAdaptor {
* Retrieves the AuronConfiguration, It bundles the corresponding engine's
Config.
*/
public abstract AuronConfiguration getAuronConfiguration();
+
+ /**
+ * Retrieves the UDF wrapper context. Each engine requires its own
implementation.
+ *
+ * @param udfSerialized The serialized UDF context.
+ * @return An instance of AuronUDFWrapperContext.
+ * @throws UnsupportedOperationException If the method is not implemented.
+ */
+ public abstract AuronUDFWrapperContext
getAuronUDFWrapperContext(ByteBuffer udfSerialized);
}
diff --git a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
index 85831550..6155cce1 100644
--- a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
+++ b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
@@ -20,8 +20,10 @@ import java.io.IOException;
import java.lang.management.BufferPoolMXBean;
import java.lang.management.ManagementFactory;
import java.net.URI;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.auron.functions.AuronUDFWrapperContext;
import org.apache.auron.hadoop.fs.FSDataInputWrapper;
import org.apache.auron.hadoop.fs.FSDataOutputWrapper;
import org.apache.auron.memory.OnHeapSpillManager;
@@ -96,4 +98,8 @@ public class JniBridge {
public static String getDirectWriteSpillToDiskFile() throws IOException {
return AuronAdaptor.getInstance().getDirectWriteSpillToDiskFile();
}
+
+ public static AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer
udfSerialized) {
+ return
AuronAdaptor.getInstance().getAuronUDFWrapperContext(udfSerialized);
+ }
}
diff --git
a/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
b/auron-core/src/test/java/org/apache/auron/functions/MockAuronUDFWrapperContext.java
similarity index 53%
copy from auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
copy to
auron-core/src/test/java/org/apache/auron/functions/MockAuronUDFWrapperContext.java
index c2cac51b..0dfd196b 100644
--- a/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
+++
b/auron-core/src/test/java/org/apache/auron/functions/MockAuronUDFWrapperContext.java
@@ -14,22 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.auron.jni;
+package org.apache.auron.functions;
-import org.apache.auron.configuration.AuronConfiguration;
-import org.apache.auron.configuration.MockAuronConfiguration;
+import java.nio.ByteBuffer;
/**
- * This is a mock class for testing the AuronAdaptor.
+ * Mock class for AuronUDFWrapperContext.
*/
-public class MockAuronAdaptor extends AuronAdaptor {
- @Override
- public void loadAuronLib() {
- // Mock implementation, no need to load auron library
+public class MockAuronUDFWrapperContext implements AuronUDFWrapperContext {
+
+ public MockAuronUDFWrapperContext(ByteBuffer udfSerialized) {
+ // Mock implementation, We can obtain some information required for
initializing the UDF through
+ // deserialization.
+ byte[] bytes = new byte[udfSerialized.remaining()];
+ udfSerialized.get(bytes);
+ // Deserialize the UDF information.
+ // get the UDF class name and initialize the UDF.
}
@Override
- public AuronConfiguration getAuronConfiguration() {
- return new MockAuronConfiguration();
+ public void eval(long inputPtr, long outputPtr) {
+ // Mock implementation, we can use the inputPtr and outputPtr to
process the data.
}
}
diff --git
a/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
b/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
index c2cac51b..9d37486f 100644
--- a/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
+++ b/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
@@ -16,8 +16,11 @@
*/
package org.apache.auron.jni;
+import java.nio.ByteBuffer;
import org.apache.auron.configuration.AuronConfiguration;
import org.apache.auron.configuration.MockAuronConfiguration;
+import org.apache.auron.functions.AuronUDFWrapperContext;
+import org.apache.auron.functions.MockAuronUDFWrapperContext;
/**
* This is a mock class for testing the AuronAdaptor.
@@ -32,4 +35,9 @@ public class MockAuronAdaptor extends AuronAdaptor {
public AuronConfiguration getAuronConfiguration() {
return new MockAuronConfiguration();
}
+
+ @Override
+ public AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer
udfSerialized) {
+ return new MockAuronUDFWrapperContext(udfSerialized);
+ }
}
diff --git a/native-engine/auron-jni-bridge/src/jni_bridge.rs
b/native-engine/auron-jni-bridge/src/jni_bridge.rs
index 30dda802..7bca2ff3 100644
--- a/native-engine/auron-jni-bridge/src/jni_bridge.rs
+++ b/native-engine/auron-jni-bridge/src/jni_bridge.rs
@@ -439,7 +439,7 @@ pub struct JavaClasses<'a> {
pub cSparkFileSegment: SparkFileSegment<'a>,
pub cSparkSQLMetric: SparkSQLMetric<'a>,
pub cSparkMetricNode: SparkMetricNode<'a>,
- pub cSparkUDFWrapperContext: SparkUDFWrapperContext<'a>,
+ pub cSparkAuronUDFWrapperContext: SparkAuronUDFWrapperContext<'a>,
pub cSparkUDAFWrapperContext: SparkUDAFWrapperContext<'a>,
pub cSparkUDTFWrapperContext: SparkUDTFWrapperContext<'a>,
pub cSparkUDAFMemTracker: SparkUDAFMemTracker<'a>,
@@ -453,6 +453,8 @@ pub struct JavaClasses<'a> {
pub cAuronFSDataInputWrapper: AuronFSDataInputWrapper<'a>,
pub cAuronFSDataOutputWrapper: AuronFSDataOutputWrapper<'a>,
pub cAuronJsonFallbackWrapper: AuronJsonFallbackWrapper<'a>,
+
+ pub cAuronUDFWrapperContext: AuronUDFWrapperContext<'a>,
}
#[allow(clippy::non_send_fields_in_send_ty)]
@@ -503,7 +505,7 @@ impl JavaClasses<'static> {
cSparkFileSegment: SparkFileSegment::new(env)?,
cSparkSQLMetric: SparkSQLMetric::new(env)?,
cSparkMetricNode: SparkMetricNode::new(env)?,
- cSparkUDFWrapperContext: SparkUDFWrapperContext::new(env)?,
+ cSparkAuronUDFWrapperContext:
SparkAuronUDFWrapperContext::new(env)?,
cSparkUDAFWrapperContext: SparkUDAFWrapperContext::new(env)?,
cSparkUDTFWrapperContext: SparkUDTFWrapperContext::new(env)?,
cSparkUDAFMemTracker: SparkUDAFMemTracker::new(env)?,
@@ -517,6 +519,8 @@ impl JavaClasses<'static> {
cAuronFSDataInputWrapper: AuronFSDataInputWrapper::new(env)?,
cAuronFSDataOutputWrapper: AuronFSDataOutputWrapper::new(env)?,
cAuronJsonFallbackWrapper: AuronJsonFallbackWrapper::new(env)?,
+
+ cAuronUDFWrapperContext: AuronUDFWrapperContext::new(env)?,
};
log::info!("Initializing JavaClasses finished");
Ok(java_classes)
@@ -573,6 +577,9 @@ pub struct JniBridge<'a> {
pub method_getDirectWriteSpillToDiskFile_ret: ReturnType,
pub method_initNativeThread: JStaticMethodID,
pub method_initNativeThread_ret: ReturnType,
+
+ pub method_getAuronUDFWrapperContext: JStaticMethodID,
+ pub method_getAuronUDFWrapperContext_ret: ReturnType,
}
impl<'a> JniBridge<'a> {
pub const SIG_TYPE: &'static str = "org/apache/spark/sql/auron/JniBridge";
@@ -657,6 +664,13 @@ impl<'a> JniBridge<'a> {
"(Ljava/lang/ClassLoader;Lorg/apache/spark/TaskContext;)V",
)?,
method_initNativeThread_ret:
ReturnType::Primitive(Primitive::Void),
+
+ method_getAuronUDFWrapperContext: env.get_static_method_id(
+ class,
+ "getAuronUDFWrapperContext",
+
"(Ljava/nio/ByteBuffer;)Lorg/apache/auron/functions/AuronUDFWrapperContext;",
+ )?,
+ method_getAuronUDFWrapperContext_ret: ReturnType::Object,
})
}
}
@@ -1193,18 +1207,18 @@ impl<'a> AuronRssPartitionWriterBase<'_> {
}
#[allow(non_snake_case)]
-pub struct SparkUDFWrapperContext<'a> {
+pub struct SparkAuronUDFWrapperContext<'a> {
pub class: JClass<'a>,
pub ctor: JMethodID,
pub method_eval: JMethodID,
pub method_eval_ret: ReturnType,
}
-impl<'a> SparkUDFWrapperContext<'a> {
- pub const SIG_TYPE: &'static str =
"org/apache/spark/sql/auron/SparkUDFWrapperContext";
+impl<'a> SparkAuronUDFWrapperContext<'a> {
+ pub const SIG_TYPE: &'static str =
"org/apache/auron/spark/sql/SparkAuronUDFWrapperContext";
- pub fn new(env: &JNIEnv<'a>) -> JniResult<SparkUDFWrapperContext<'a>> {
+ pub fn new(env: &JNIEnv<'a>) -> JniResult<SparkAuronUDFWrapperContext<'a>>
{
let class = get_global_jclass(env, Self::SIG_TYPE)?;
- Ok(SparkUDFWrapperContext {
+ Ok(SparkAuronUDFWrapperContext {
class,
ctor: env.get_method_id(class, "<init>",
"(Ljava/nio/ByteBuffer;)V")?,
method_eval: env.get_method_id(class, "eval", "(JJ)V")?,
@@ -1213,6 +1227,25 @@ impl<'a> SparkUDFWrapperContext<'a> {
}
}
+#[allow(non_snake_case)]
+pub struct AuronUDFWrapperContext<'a> {
+ pub class: JClass<'a>,
+ pub method_eval: JMethodID,
+ pub method_eval_ret: ReturnType,
+}
+impl<'a> AuronUDFWrapperContext<'a> {
+ pub const SIG_TYPE: &'static str =
"org/apache/auron/functions/AuronUDFWrapperContext";
+
+ pub fn new(env: &JNIEnv<'a>) -> JniResult<AuronUDFWrapperContext<'a>> {
+ let class = get_global_jclass(env, Self::SIG_TYPE)?;
+ Ok(AuronUDFWrapperContext {
+ class,
+ method_eval: env.get_method_id(class, "eval", "(JJ)V")?,
+ method_eval_ret: ReturnType::Primitive(Primitive::Void),
+ })
+ }
+}
+
#[allow(non_snake_case)]
pub struct SparkUDAFWrapperContext<'a> {
pub class: JClass<'a>,
diff --git a/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
b/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
index edede8f1..3c1a94fd 100644
--- a/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
+++ b/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
@@ -94,7 +94,7 @@ impl SparkUDFWrapperExpr {
.get_or_try_init(|| {
let serialized_buf =
jni_new_direct_byte_buffer!(&self.serialized)?;
let jcontext_local =
-
jni_new_object!(SparkUDFWrapperContext(serialized_buf.as_obj()))?;
+
jni_new_object!(SparkAuronUDFWrapperContext(serialized_buf.as_obj()))?;
jni_new_global_ref!(jcontext_local.as_obj())
})
.cloned()
@@ -213,7 +213,7 @@ fn invoke_udf(
let struct_array = StructArray::from(params_batch);
let mut export_ffi_array = FFI_ArrowArray::new(&struct_array.to_data());
let mut import_ffi_array = FFI_ArrowArray::empty();
- jni_call!(SparkUDFWrapperContext(jcontext.as_obj()).eval(
+ jni_call!(SparkAuronUDFWrapperContext(jcontext.as_obj()).eval(
&mut export_ffi_array as *mut FFI_ArrowArray as i64,
&mut import_ffi_array as *mut FFI_ArrowArray as i64,
) -> ())?;
diff --git
a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala
b/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala
index a0f300a6..2f7e5707 100644
---
a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala
+++
b/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala
@@ -111,4 +111,13 @@ class AuronFunctionSuite
}
}
}
+
+ test("regexp_extract function with UDF failback") {
+ withTable("t1") {
+ sql("create table t1(c1 string) using parquet")
+ sql("insert into t1 values('Auron Spark SQL')")
+ val df = sql("select regexp_extract(c1, '^A(.*)L$', 1) from t1")
+ checkAnswer(df, Seq(Row("uron Spark SQ")))
+ }
+ }
}
diff --git a/spark-extension/pom.xml b/spark-extension/pom.xml
index bc6ee552..f72bb487 100644
--- a/spark-extension/pom.xml
+++ b/spark-extension/pom.xml
@@ -29,6 +29,11 @@
<packaging>jar</packaging>
<dependencies>
+ <dependency>
+ <groupId>org.apache.auron</groupId>
+ <artifactId>auron-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.auron</groupId>
<artifactId>hadoop-shim_${scalaVersion}</artifactId>
diff --git
a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
b/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
index 9a2ac12b..af035c7a 100644
--- a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
+++ b/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
@@ -19,8 +19,10 @@ package org.apache.spark.sql.auron;
import java.lang.management.BufferPoolMXBean;
import java.lang.management.ManagementFactory;
import java.net.URI;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.auron.functions.AuronUDFWrapperContext;
import org.apache.auron.hadoop.fs.FSDataInputWrapper;
import org.apache.auron.hadoop.fs.FSDataInputWrapper$;
import org.apache.auron.hadoop.fs.FSDataOutputWrapper;
@@ -121,4 +123,8 @@ public class JniBridge {
TaskContextHelper$.MODULE$.setNativeThreadName();
TaskContextHelper$.MODULE$.setHDFSCallerContext();
}
+
+ public static AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer
udfSerialized) {
+ throw new UnsupportedOperationException("This API is designed to
support next-generation multi-engine.");
+ }
}
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDFWrapperContext.scala
b/spark-extension/src/main/scala/org/apache/auron/spark/sql/SparkAuronUDFWrapperContext.scala
similarity index 83%
rename from
spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDFWrapperContext.scala
rename to
spark-extension/src/main/scala/org/apache/auron/spark/sql/SparkAuronUDFWrapperContext.scala
index 228079f0..9161a9d8 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDFWrapperContext.scala
+++
b/spark-extension/src/main/scala/org/apache/auron/spark/sql/SparkAuronUDFWrapperContext.scala
@@ -14,29 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.auron
+package org.apache.auron.spark.sql
import java.nio.ByteBuffer
-import org.apache.arrow.c.ArrowArray
-import org.apache.arrow.c.Data
+import org.apache.arrow.c.{ArrowArray, Data}
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.dictionary.DictionaryProvider
import
org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.util.Using
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.expressions.Nondeterministic
-import org.apache.spark.sql.execution.auron.arrowio.util.ArrowUtils
+import org.apache.spark.sql.catalyst.expressions.{Expression,
GenericInternalRow, Nondeterministic}
+import org.apache.spark.sql.execution.auron.arrowio.util.{ArrowUtils,
ArrowWriter}
import
org.apache.spark.sql.execution.auron.arrowio.util.ArrowUtils.ROOT_ALLOCATOR
-import org.apache.spark.sql.execution.auron.arrowio.util.ArrowWriter
import org.apache.spark.sql.execution.auron.columnar.ColumnarHelper
-import org.apache.spark.sql.types.StructField
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
-case class SparkUDFWrapperContext(serialized: ByteBuffer) extends Logging {
+import org.apache.auron.functions.AuronUDFWrapperContext
+
+case class SparkAuronUDFWrapperContext(serialized: ByteBuffer)
+ extends AuronUDFWrapperContext
+ with Logging {
private val (expr, javaParamsSchema) =
NativeConverters.deserializeExpression[Expression, StructType]({
val bytes = new Array[Byte](serialized.remaining())