This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new fa5c64b9 [AURON #1374] Introduce AuronUDFWrapperContext (#1375)
fa5c64b9 is described below

commit fa5c64b99f0633fddb35c957df62c4092f5bf2cd
Author: zhangmang <[email protected]>
AuthorDate: Mon Sep 29 16:12:23 2025 +0800

    [AURON #1374] Introduce AuronUDFWrapperContext (#1375)
    
    * [AURON #1374] Introduce AuronUDFWrapperContext
    
    * fix compile
    
    * fix checkstyle
    
    * add test regexp_extract
    
    * fix checkstyle
---
 .../auron/functions/AuronUDFWrapperContext.java    | 47 ++++++++++++++++++++++
 .../java/org/apache/auron/jni/AuronAdaptor.java    | 11 +++++
 .../main/java/org/apache/auron/jni/JniBridge.java  |  6 +++
 .../MockAuronUDFWrapperContext.java}               | 24 ++++++-----
 .../org/apache/auron/jni/MockAuronAdaptor.java     |  8 ++++
 native-engine/auron-jni-bridge/src/jni_bridge.rs   | 47 ++++++++++++++++++----
 .../datafusion-ext-exprs/src/spark_udf_wrapper.rs  |  4 +-
 .../spark/sql/auron/AuronFunctionSuite.scala       |  9 +++++
 spark-extension/pom.xml                            |  5 +++
 .../java/org/apache/spark/sql/auron/JniBridge.java |  6 +++
 .../spark/sql/SparkAuronUDFWrapperContext.scala}   | 22 +++++-----
 11 files changed, 159 insertions(+), 30 deletions(-)

diff --git 
a/auron-core/src/main/java/org/apache/auron/functions/AuronUDFWrapperContext.java
 
b/auron-core/src/main/java/org/apache/auron/functions/AuronUDFWrapperContext.java
new file mode 100644
index 00000000..110b4fcc
--- /dev/null
+++ 
b/auron-core/src/main/java/org/apache/auron/functions/AuronUDFWrapperContext.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.functions;
+
+/**
+ * Wrapper context for user-defined functions (UDFs).
+ * This class bridges different engines and native UDF implementations.
+ * SQL engines such as Spark and Flink should provide their respective 
implementations based on this.
+ */
+public interface AuronUDFWrapperContext {
+
+    /**
+     * Opens the UDF context with the given resource ID.
+     * The Flink engine requires the FunctionContext, which can be obtained 
via the ResourceID, to initialize the Flink ScalarFunction.
+     * @param resourceId
+     */
+    default void open(String resourceId) {}
+
+    /**
+     * Evaluates the UDF with the provided input and output pointers.
+     * This method is called for each invocation of the UDF during query 
execution.
+     *
+     * @param inputPtr Native pointer to the input data
+     * @param outputPtr Native pointer to the output location where results 
should be written
+     */
+    void eval(long inputPtr, long outputPtr);
+
+    /**
+     * Closes the UDF context.
+     * Some UDFs may need to perform resource cleanup operations.
+     */
+    default void close() {}
+}
diff --git a/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java 
b/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java
index 4c1d27b8..71a6349e 100644
--- a/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java
+++ b/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java
@@ -18,7 +18,9 @@ package org.apache.auron.jni;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import org.apache.auron.configuration.AuronConfiguration;
+import org.apache.auron.functions.AuronUDFWrapperContext;
 import org.apache.auron.memory.OnHeapSpillManager;
 
 /**
@@ -119,4 +121,13 @@ public abstract class AuronAdaptor {
      * Retrieves the AuronConfiguration, It bundles the corresponding engine's 
Config.
      */
     public abstract AuronConfiguration getAuronConfiguration();
+
+    /**
+     * Retrieves the UDF wrapper context. Each engine requires its own 
implementation.
+     *
+     * @param udfSerialized The serialized UDF context.
+     * @return An instance of AuronUDFWrapperContext.
+     * @throws UnsupportedOperationException If the method is not implemented.
+     */
+    public abstract AuronUDFWrapperContext 
getAuronUDFWrapperContext(ByteBuffer udfSerialized);
 }
diff --git a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java 
b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
index 85831550..6155cce1 100644
--- a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
+++ b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
@@ -20,8 +20,10 @@ import java.io.IOException;
 import java.lang.management.BufferPoolMXBean;
 import java.lang.management.ManagementFactory;
 import java.net.URI;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.auron.functions.AuronUDFWrapperContext;
 import org.apache.auron.hadoop.fs.FSDataInputWrapper;
 import org.apache.auron.hadoop.fs.FSDataOutputWrapper;
 import org.apache.auron.memory.OnHeapSpillManager;
@@ -96,4 +98,8 @@ public class JniBridge {
     public static String getDirectWriteSpillToDiskFile() throws IOException {
         return AuronAdaptor.getInstance().getDirectWriteSpillToDiskFile();
     }
+
+    public static AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer 
udfSerialized) {
+        return 
AuronAdaptor.getInstance().getAuronUDFWrapperContext(udfSerialized);
+    }
 }
diff --git 
a/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java 
b/auron-core/src/test/java/org/apache/auron/functions/MockAuronUDFWrapperContext.java
similarity index 53%
copy from auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
copy to 
auron-core/src/test/java/org/apache/auron/functions/MockAuronUDFWrapperContext.java
index c2cac51b..0dfd196b 100644
--- a/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
+++ 
b/auron-core/src/test/java/org/apache/auron/functions/MockAuronUDFWrapperContext.java
@@ -14,22 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.auron.jni;
+package org.apache.auron.functions;
 
-import org.apache.auron.configuration.AuronConfiguration;
-import org.apache.auron.configuration.MockAuronConfiguration;
+import java.nio.ByteBuffer;
 
 /**
- * This is a mock class for testing the AuronAdaptor.
+ * Mock class for AuronUDFWrapperContext.
  */
-public class MockAuronAdaptor extends AuronAdaptor {
-    @Override
-    public void loadAuronLib() {
-        // Mock implementation, no need to load auron library
+public class MockAuronUDFWrapperContext implements AuronUDFWrapperContext {
+
+    public MockAuronUDFWrapperContext(ByteBuffer udfSerialized) {
+        // Mock implementation, We can obtain some information required for 
initializing the UDF through
+        // deserialization.
+        byte[] bytes = new byte[udfSerialized.remaining()];
+        udfSerialized.get(bytes);
+        // Deserialize the UDF information.
+        // get the UDF class name and initialize the UDF.
     }
 
     @Override
-    public AuronConfiguration getAuronConfiguration() {
-        return new MockAuronConfiguration();
+    public void eval(long inputPtr, long outputPtr) {
+        // Mock implementation, we can use the inputPtr and outputPtr to 
process the data.
     }
 }
diff --git 
a/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java 
b/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
index c2cac51b..9d37486f 100644
--- a/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
+++ b/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
@@ -16,8 +16,11 @@
  */
 package org.apache.auron.jni;
 
+import java.nio.ByteBuffer;
 import org.apache.auron.configuration.AuronConfiguration;
 import org.apache.auron.configuration.MockAuronConfiguration;
+import org.apache.auron.functions.AuronUDFWrapperContext;
+import org.apache.auron.functions.MockAuronUDFWrapperContext;
 
 /**
  * This is a mock class for testing the AuronAdaptor.
@@ -32,4 +35,9 @@ public class MockAuronAdaptor extends AuronAdaptor {
     public AuronConfiguration getAuronConfiguration() {
         return new MockAuronConfiguration();
     }
+
+    @Override
+    public AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer 
udfSerialized) {
+        return new MockAuronUDFWrapperContext(udfSerialized);
+    }
 }
diff --git a/native-engine/auron-jni-bridge/src/jni_bridge.rs 
b/native-engine/auron-jni-bridge/src/jni_bridge.rs
index 30dda802..7bca2ff3 100644
--- a/native-engine/auron-jni-bridge/src/jni_bridge.rs
+++ b/native-engine/auron-jni-bridge/src/jni_bridge.rs
@@ -439,7 +439,7 @@ pub struct JavaClasses<'a> {
     pub cSparkFileSegment: SparkFileSegment<'a>,
     pub cSparkSQLMetric: SparkSQLMetric<'a>,
     pub cSparkMetricNode: SparkMetricNode<'a>,
-    pub cSparkUDFWrapperContext: SparkUDFWrapperContext<'a>,
+    pub cSparkAuronUDFWrapperContext: SparkAuronUDFWrapperContext<'a>,
     pub cSparkUDAFWrapperContext: SparkUDAFWrapperContext<'a>,
     pub cSparkUDTFWrapperContext: SparkUDTFWrapperContext<'a>,
     pub cSparkUDAFMemTracker: SparkUDAFMemTracker<'a>,
@@ -453,6 +453,8 @@ pub struct JavaClasses<'a> {
     pub cAuronFSDataInputWrapper: AuronFSDataInputWrapper<'a>,
     pub cAuronFSDataOutputWrapper: AuronFSDataOutputWrapper<'a>,
     pub cAuronJsonFallbackWrapper: AuronJsonFallbackWrapper<'a>,
+
+    pub cAuronUDFWrapperContext: AuronUDFWrapperContext<'a>,
 }
 
 #[allow(clippy::non_send_fields_in_send_ty)]
@@ -503,7 +505,7 @@ impl JavaClasses<'static> {
                 cSparkFileSegment: SparkFileSegment::new(env)?,
                 cSparkSQLMetric: SparkSQLMetric::new(env)?,
                 cSparkMetricNode: SparkMetricNode::new(env)?,
-                cSparkUDFWrapperContext: SparkUDFWrapperContext::new(env)?,
+                cSparkAuronUDFWrapperContext: 
SparkAuronUDFWrapperContext::new(env)?,
                 cSparkUDAFWrapperContext: SparkUDAFWrapperContext::new(env)?,
                 cSparkUDTFWrapperContext: SparkUDTFWrapperContext::new(env)?,
                 cSparkUDAFMemTracker: SparkUDAFMemTracker::new(env)?,
@@ -517,6 +519,8 @@ impl JavaClasses<'static> {
                 cAuronFSDataInputWrapper: AuronFSDataInputWrapper::new(env)?,
                 cAuronFSDataOutputWrapper: AuronFSDataOutputWrapper::new(env)?,
                 cAuronJsonFallbackWrapper: AuronJsonFallbackWrapper::new(env)?,
+
+                cAuronUDFWrapperContext: AuronUDFWrapperContext::new(env)?,
             };
             log::info!("Initializing JavaClasses finished");
             Ok(java_classes)
@@ -573,6 +577,9 @@ pub struct JniBridge<'a> {
     pub method_getDirectWriteSpillToDiskFile_ret: ReturnType,
     pub method_initNativeThread: JStaticMethodID,
     pub method_initNativeThread_ret: ReturnType,
+
+    pub method_getAuronUDFWrapperContext: JStaticMethodID,
+    pub method_getAuronUDFWrapperContext_ret: ReturnType,
 }
 impl<'a> JniBridge<'a> {
     pub const SIG_TYPE: &'static str = "org/apache/spark/sql/auron/JniBridge";
@@ -657,6 +664,13 @@ impl<'a> JniBridge<'a> {
                 "(Ljava/lang/ClassLoader;Lorg/apache/spark/TaskContext;)V",
             )?,
             method_initNativeThread_ret: 
ReturnType::Primitive(Primitive::Void),
+
+            method_getAuronUDFWrapperContext: env.get_static_method_id(
+                class,
+                "getAuronUDFWrapperContext",
+                
"(Ljava/nio/ByteBuffer;)Lorg/apache/auron/functions/AuronUDFWrapperContext;",
+            )?,
+            method_getAuronUDFWrapperContext_ret: ReturnType::Object,
         })
     }
 }
@@ -1193,18 +1207,18 @@ impl<'a> AuronRssPartitionWriterBase<'_> {
 }
 
 #[allow(non_snake_case)]
-pub struct SparkUDFWrapperContext<'a> {
+pub struct SparkAuronUDFWrapperContext<'a> {
     pub class: JClass<'a>,
     pub ctor: JMethodID,
     pub method_eval: JMethodID,
     pub method_eval_ret: ReturnType,
 }
-impl<'a> SparkUDFWrapperContext<'a> {
-    pub const SIG_TYPE: &'static str = 
"org/apache/spark/sql/auron/SparkUDFWrapperContext";
+impl<'a> SparkAuronUDFWrapperContext<'a> {
+    pub const SIG_TYPE: &'static str = 
"org/apache/auron/spark/sql/SparkAuronUDFWrapperContext";
 
-    pub fn new(env: &JNIEnv<'a>) -> JniResult<SparkUDFWrapperContext<'a>> {
+    pub fn new(env: &JNIEnv<'a>) -> JniResult<SparkAuronUDFWrapperContext<'a>> 
{
         let class = get_global_jclass(env, Self::SIG_TYPE)?;
-        Ok(SparkUDFWrapperContext {
+        Ok(SparkAuronUDFWrapperContext {
             class,
             ctor: env.get_method_id(class, "<init>", 
"(Ljava/nio/ByteBuffer;)V")?,
             method_eval: env.get_method_id(class, "eval", "(JJ)V")?,
@@ -1213,6 +1227,25 @@ impl<'a> SparkUDFWrapperContext<'a> {
     }
 }
 
+#[allow(non_snake_case)]
+pub struct AuronUDFWrapperContext<'a> {
+    pub class: JClass<'a>,
+    pub method_eval: JMethodID,
+    pub method_eval_ret: ReturnType,
+}
+impl<'a> AuronUDFWrapperContext<'a> {
+    pub const SIG_TYPE: &'static str = 
"org/apache/auron/functions/AuronUDFWrapperContext";
+
+    pub fn new(env: &JNIEnv<'a>) -> JniResult<AuronUDFWrapperContext<'a>> {
+        let class = get_global_jclass(env, Self::SIG_TYPE)?;
+        Ok(AuronUDFWrapperContext {
+            class,
+            method_eval: env.get_method_id(class, "eval", "(JJ)V")?,
+            method_eval_ret: ReturnType::Primitive(Primitive::Void),
+        })
+    }
+}
+
 #[allow(non_snake_case)]
 pub struct SparkUDAFWrapperContext<'a> {
     pub class: JClass<'a>,
diff --git a/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs 
b/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
index edede8f1..3c1a94fd 100644
--- a/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
+++ b/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
@@ -94,7 +94,7 @@ impl SparkUDFWrapperExpr {
             .get_or_try_init(|| {
                 let serialized_buf = 
jni_new_direct_byte_buffer!(&self.serialized)?;
                 let jcontext_local =
-                    
jni_new_object!(SparkUDFWrapperContext(serialized_buf.as_obj()))?;
+                    
jni_new_object!(SparkAuronUDFWrapperContext(serialized_buf.as_obj()))?;
                 jni_new_global_ref!(jcontext_local.as_obj())
             })
             .cloned()
@@ -213,7 +213,7 @@ fn invoke_udf(
     let struct_array = StructArray::from(params_batch);
     let mut export_ffi_array = FFI_ArrowArray::new(&struct_array.to_data());
     let mut import_ffi_array = FFI_ArrowArray::empty();
-    jni_call!(SparkUDFWrapperContext(jcontext.as_obj()).eval(
+    jni_call!(SparkAuronUDFWrapperContext(jcontext.as_obj()).eval(
         &mut export_ffi_array as *mut FFI_ArrowArray as i64,
         &mut import_ffi_array as *mut FFI_ArrowArray as i64,
     ) -> ())?;
diff --git 
a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala
 
b/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala
index a0f300a6..2f7e5707 100644
--- 
a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala
+++ 
b/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala
@@ -111,4 +111,13 @@ class AuronFunctionSuite
       }
     }
   }
+
+  test("regexp_extract function with UDF failback") {
+    withTable("t1") {
+      sql("create table t1(c1 string) using parquet")
+      sql("insert into t1 values('Auron Spark SQL')")
+      val df = sql("select regexp_extract(c1, '^A(.*)L$', 1) from t1")
+      checkAnswer(df, Seq(Row("uron Spark SQ")))
+    }
+  }
 }
diff --git a/spark-extension/pom.xml b/spark-extension/pom.xml
index bc6ee552..f72bb487 100644
--- a/spark-extension/pom.xml
+++ b/spark-extension/pom.xml
@@ -29,6 +29,11 @@
   <packaging>jar</packaging>
 
   <dependencies>
+    <dependency>
+      <groupId>org.apache.auron</groupId>
+      <artifactId>auron-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.auron</groupId>
       <artifactId>hadoop-shim_${scalaVersion}</artifactId>
diff --git 
a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java 
b/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
index 9a2ac12b..af035c7a 100644
--- a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
+++ b/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
@@ -19,8 +19,10 @@ package org.apache.spark.sql.auron;
 import java.lang.management.BufferPoolMXBean;
 import java.lang.management.ManagementFactory;
 import java.net.URI;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.auron.functions.AuronUDFWrapperContext;
 import org.apache.auron.hadoop.fs.FSDataInputWrapper;
 import org.apache.auron.hadoop.fs.FSDataInputWrapper$;
 import org.apache.auron.hadoop.fs.FSDataOutputWrapper;
@@ -121,4 +123,8 @@ public class JniBridge {
         TaskContextHelper$.MODULE$.setNativeThreadName();
         TaskContextHelper$.MODULE$.setHDFSCallerContext();
     }
+
+    public static AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer 
udfSerialized) {
+        throw new UnsupportedOperationException("This API is designed to 
support next-generation multi-engine.");
+    }
 }
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDFWrapperContext.scala
 
b/spark-extension/src/main/scala/org/apache/auron/spark/sql/SparkAuronUDFWrapperContext.scala
similarity index 83%
rename from 
spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDFWrapperContext.scala
rename to 
spark-extension/src/main/scala/org/apache/auron/spark/sql/SparkAuronUDFWrapperContext.scala
index 228079f0..9161a9d8 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDFWrapperContext.scala
+++ 
b/spark-extension/src/main/scala/org/apache/auron/spark/sql/SparkAuronUDFWrapperContext.scala
@@ -14,29 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.auron
+package org.apache.auron.spark.sql
 
 import java.nio.ByteBuffer
 
-import org.apache.arrow.c.ArrowArray
-import org.apache.arrow.c.Data
+import org.apache.arrow.c.{ArrowArray, Data}
 import org.apache.arrow.vector.VectorSchemaRoot
 import org.apache.arrow.vector.dictionary.DictionaryProvider
 import 
org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.auron.util.Using
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.expressions.Nondeterministic
-import org.apache.spark.sql.execution.auron.arrowio.util.ArrowUtils
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
GenericInternalRow, Nondeterministic}
+import org.apache.spark.sql.execution.auron.arrowio.util.{ArrowUtils, 
ArrowWriter}
 import 
org.apache.spark.sql.execution.auron.arrowio.util.ArrowUtils.ROOT_ALLOCATOR
-import org.apache.spark.sql.execution.auron.arrowio.util.ArrowWriter
 import org.apache.spark.sql.execution.auron.columnar.ColumnarHelper
-import org.apache.spark.sql.types.StructField
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
 
-case class SparkUDFWrapperContext(serialized: ByteBuffer) extends Logging {
+import org.apache.auron.functions.AuronUDFWrapperContext
+
+case class SparkAuronUDFWrapperContext(serialized: ByteBuffer)
+    extends AuronUDFWrapperContext
+    with Logging {
   private val (expr, javaParamsSchema) =
     NativeConverters.deserializeExpression[Expression, StructType]({
       val bytes = new Array[Byte](serialized.remaining())

Reply via email to