This is an automated email from the ASF dual-hosted git repository.

zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 5998f1cd [AURON #2102] Initializing JavaClasses in JNI and decoupling 
Spark (#2103)
5998f1cd is described below

commit 5998f1cd630acc4881786d7309b362c86e51f549
Author: zhangmang <[email protected]>
AuthorDate: Wed Mar 18 20:16:57 2026 +0800

    [AURON #2102] Initializing JavaClasses in JNI and decoupling Spark (#2103)
    
    # Which issue does this PR close?
    
    Closes #2102
    
    # Rationale for this change
    Many fields in JavaClasses are tightly coupled with Spark Java code; we
    decide whether to load the relevant code based on the engine.
    
    # What changes are included in this PR?
    * Introduce getEngineName API for `JniBridge` and `AuronAdaptor`
    * modify jni_bridge add engine type checking when initializing
    JavaClasses
    
    # Are there any user-facing changes?
    * No
    
    # How was this patch tested?
    * No
---
 .../java/org/apache/auron/jni/AuronAdaptor.java    |   5 +
 .../main/java/org/apache/auron/jni/JniBridge.java  |   4 +
 .../org/apache/auron/jni/MockAuronAdaptor.java     |   5 +
 .../org/apache/auron/jni/FlinkAuronAdaptor.java    |   5 +
 native-engine/auron-jni-bridge/src/jni_bridge.rs   | 153 +++++++++++++++++----
 native-engine/auron/src/metrics.rs                 |   8 +-
 native-engine/auron/src/rt.rs                      |   4 +-
 .../org/apache/auron/jni/SparkAuronAdaptor.java    |   5 +
 8 files changed, 160 insertions(+), 29 deletions(-)

diff --git a/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java 
b/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java
index 463fb727..0162bef5 100644
--- a/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java
+++ b/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java
@@ -125,4 +125,9 @@ public abstract class AuronAdaptor {
      * @throws UnsupportedOperationException If the method is not implemented.
      */
     public abstract AuronUDFWrapperContext 
getAuronUDFWrapperContext(ByteBuffer udfSerialized);
+
+    /**
+     * Returns the name of the current engine, such as Spark or Flink.
+     */
+    public abstract String getEngineName();
 }
diff --git a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java 
b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
index 121970c2..d0853608 100644
--- a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
+++ b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
@@ -130,6 +130,10 @@ public class JniBridge {
         return getConfValue(confKey);
     }
 
+    public static String getEngineName() {
+        return AuronAdaptor.getInstance().getEngineName();
+    }
+
     static <T> T getConfValue(String confKey) {
         Class<? extends AuronConfiguration> confClass =
                 AuronAdaptor.getInstance().getAuronConfiguration().getClass();
diff --git 
a/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java 
b/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
index 6cb2407f..61022941 100644
--- a/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
+++ b/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
@@ -59,4 +59,9 @@ public class MockAuronAdaptor extends AuronAdaptor {
     public AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer 
udfSerialized) {
         return new MockAuronUDFWrapperContext(udfSerialized);
     }
+
+    @Override
+    public String getEngineName() {
+        return "Test";
+    }
 }
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/jni/FlinkAuronAdaptor.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/jni/FlinkAuronAdaptor.java
index 11a40498..ba95f93a 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/jni/FlinkAuronAdaptor.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/jni/FlinkAuronAdaptor.java
@@ -72,4 +72,9 @@ public class FlinkAuronAdaptor extends AuronAdaptor {
     public AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer 
byteBuffer) {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public String getEngineName() {
+        return "Flink";
+    }
 }
diff --git a/native-engine/auron-jni-bridge/src/jni_bridge.rs 
b/native-engine/auron-jni-bridge/src/jni_bridge.rs
index f058901a..a98ae7f5 100644
--- a/native-engine/auron-jni-bridge/src/jni_bridge.rs
+++ b/native-engine/auron-jni-bridge/src/jni_bridge.rs
@@ -442,21 +442,21 @@ pub struct JavaClasses<'a> {
 
     pub cSparkFileSegment: SparkFileSegment<'a>,
     pub cSparkSQLMetric: SparkSQLMetric<'a>,
-    pub cSparkMetricNode: SparkMetricNode<'a>,
     pub cSparkAuronUDFWrapperContext: SparkAuronUDFWrapperContext<'a>,
     pub cSparkUDAFWrapperContext: SparkUDAFWrapperContext<'a>,
     pub cSparkUDTFWrapperContext: SparkUDTFWrapperContext<'a>,
     pub cSparkUDAFMemTracker: SparkUDAFMemTracker<'a>,
     pub cAuronRssPartitionWriterBase: AuronRssPartitionWriterBase<'a>,
-    pub cAuronCallNativeWrapper: AuronCallNativeWrapper<'a>,
     pub cAuronOnHeapSpillManager: AuronOnHeapSpillManager<'a>,
     pub cAuronNativeParquetSinkUtils: AuronNativeParquetSinkUtils<'a>,
     pub cAuronBlockObject: AuronBlockObject<'a>,
+    pub cAuronJsonFallbackWrapper: AuronJsonFallbackWrapper<'a>,
+
     pub cAuronArrowFFIExporter: AuronArrowFFIExporter<'a>,
+    pub cAuronCallNativeWrapper: AuronCallNativeWrapper<'a>,
     pub cAuronFSDataInputWrapper: AuronFSDataInputWrapper<'a>,
     pub cAuronFSDataOutputWrapper: AuronFSDataOutputWrapper<'a>,
-    pub cAuronJsonFallbackWrapper: AuronJsonFallbackWrapper<'a>,
-
+    pub cMetricNode: MetricNode<'a>,
     pub cAuronUDFWrapperContext: AuronUDFWrapperContext<'a>,
 }
 
@@ -481,6 +481,61 @@ impl JavaClasses<'static> {
                 )?
                 .l()?;
 
+            let engine_name_java = env
+                .call_static_method_unchecked(
+                    jni_bridge.class,
+                    jni_bridge.method_getEngineName,
+                    jni_bridge.method_getEngineName_ret.clone(),
+                    &[],
+                )?
+                .l()?;
+            let engine_name = env
+                .get_string(engine_name_java.into())
+                .map(|s| String::from(s))
+                .expect("engine_name is not valid");
+            log::info!("Runtime engine is {engine_name}");
+
+            let (
+                c_spark_file_segment,
+                c_spark_sql_metric,
+                c_spark_auron_udf_wrapper_context,
+                c_spark_udaf_wrapper_context,
+                c_spark_udtf_wrapper_context,
+                c_spark_udaf_mem_tracker,
+                c_auron_rss_partition_writer_base,
+                c_auron_on_heap_spill_manager,
+                c_auron_native_parquet_sink_utils,
+                c_auron_block_object,
+                c_auron_json_fallback_wrapper,
+            ) = match engine_name.as_str() {
+                "Spark" => (
+                    SparkFileSegment::new(env)?,
+                    SparkSQLMetric::new(env)?,
+                    SparkAuronUDFWrapperContext::new(env)?,
+                    SparkUDAFWrapperContext::new(env)?,
+                    SparkUDTFWrapperContext::new(env)?,
+                    SparkUDAFMemTracker::new(env)?,
+                    AuronRssPartitionWriterBase::new(env)?,
+                    AuronOnHeapSpillManager::new(env)?,
+                    AuronNativeParquetSinkUtils::new(env)?,
+                    AuronBlockObject::new(env)?,
+                    AuronJsonFallbackWrapper::new(env)?,
+                ),
+                _ => (
+                    SparkFileSegment::default(),
+                    SparkSQLMetric::default(),
+                    SparkAuronUDFWrapperContext::default(),
+                    SparkUDAFWrapperContext::default(),
+                    SparkUDTFWrapperContext::default(),
+                    SparkUDAFMemTracker::default(),
+                    AuronRssPartitionWriterBase::default(),
+                    AuronOnHeapSpillManager::default(),
+                    AuronNativeParquetSinkUtils::default(),
+                    AuronBlockObject::default(),
+                    AuronJsonFallbackWrapper::default(),
+                ),
+            };
+
             let java_classes = JavaClasses {
                 jvm: env.get_java_vm()?,
                 classloader: get_global_ref_jobject(env, classloader)?,
@@ -505,23 +560,23 @@ impl JavaClasses<'static> {
                 cHadoopFileSystem: HadoopFileSystem::new(env)?,
                 cHadoopPath: HadoopPath::new(env)?,
 
-                cSparkFileSegment: SparkFileSegment::new(env)?,
-                cSparkSQLMetric: SparkSQLMetric::new(env)?,
-                cSparkMetricNode: SparkMetricNode::new(env)?,
-                cSparkAuronUDFWrapperContext: 
SparkAuronUDFWrapperContext::new(env)?,
-                cSparkUDAFWrapperContext: SparkUDAFWrapperContext::new(env)?,
-                cSparkUDTFWrapperContext: SparkUDTFWrapperContext::new(env)?,
-                cSparkUDAFMemTracker: SparkUDAFMemTracker::new(env)?,
-                cAuronRssPartitionWriterBase: 
AuronRssPartitionWriterBase::new(env)?,
-                cAuronCallNativeWrapper: AuronCallNativeWrapper::new(env)?,
-                cAuronOnHeapSpillManager: AuronOnHeapSpillManager::new(env)?,
-                cAuronNativeParquetSinkUtils: 
AuronNativeParquetSinkUtils::new(env)?,
-                cAuronBlockObject: AuronBlockObject::new(env)?,
+                cSparkFileSegment: c_spark_file_segment,
+                cSparkSQLMetric: c_spark_sql_metric,
+                cSparkAuronUDFWrapperContext: 
c_spark_auron_udf_wrapper_context,
+                cSparkUDAFWrapperContext: c_spark_udaf_wrapper_context,
+                cSparkUDTFWrapperContext: c_spark_udtf_wrapper_context,
+                cSparkUDAFMemTracker: c_spark_udaf_mem_tracker,
+                cAuronRssPartitionWriterBase: 
c_auron_rss_partition_writer_base,
+                cAuronOnHeapSpillManager: c_auron_on_heap_spill_manager,
+                cAuronNativeParquetSinkUtils: 
c_auron_native_parquet_sink_utils,
+                cAuronBlockObject: c_auron_block_object,
+                cAuronJsonFallbackWrapper: c_auron_json_fallback_wrapper,
+
                 cAuronArrowFFIExporter: AuronArrowFFIExporter::new(env)?,
+                cAuronCallNativeWrapper: AuronCallNativeWrapper::new(env)?,
                 cAuronFSDataInputWrapper: AuronFSDataInputWrapper::new(env)?,
                 cAuronFSDataOutputWrapper: AuronFSDataOutputWrapper::new(env)?,
-                cAuronJsonFallbackWrapper: AuronJsonFallbackWrapper::new(env)?,
-
+                cMetricNode: MetricNode::new(env)?,
                 cAuronUDFWrapperContext: AuronUDFWrapperContext::new(env)?,
             };
             log::info!("Initializing JavaClasses finished");
@@ -587,6 +642,8 @@ pub struct JniBridge<'a> {
     pub method_booleanConf_ret: ReturnType,
     pub method_stringConf: JStaticMethodID,
     pub method_stringConf_ret: ReturnType,
+    pub method_getEngineName: JStaticMethodID,
+    pub method_getEngineName_ret: ReturnType,
 }
 impl<'a> JniBridge<'a> {
     pub const SIG_TYPE: &'static str = "org/apache/auron/jni/JniBridge";
@@ -700,6 +757,12 @@ impl<'a> JniBridge<'a> {
                 "(Ljava/lang/String;)Ljava/lang/String;",
             )?,
             method_stringConf_ret: ReturnType::Object,
+            method_getEngineName: env.get_static_method_id(
+                class,
+                "getEngineName",
+                "()Ljava/lang/String;",
+            )?,
+            method_getEngineName_ret: ReturnType::Object,
         })
     }
 }
@@ -1110,6 +1173,10 @@ impl<'a> SparkFileSegment<'a> {
             method_length_ret: ReturnType::Primitive(Primitive::Long),
         })
     }
+
+    fn default() -> Self {
+        unsafe { std::mem::zeroed() }
+    }
 }
 
 #[allow(non_snake_case)]
@@ -1129,22 +1196,26 @@ impl<'a> SparkSQLMetric<'a> {
             method_add_ret: ReturnType::Primitive(Primitive::Void),
         })
     }
+
+    fn default() -> Self {
+        unsafe { std::mem::zeroed() }
+    }
 }
 
 #[allow(non_snake_case)]
-pub struct SparkMetricNode<'a> {
+pub struct MetricNode<'a> {
     pub class: JClass<'a>,
     pub method_getChild: JMethodID,
     pub method_getChild_ret: ReturnType,
     pub method_add: JMethodID,
     pub method_add_ret: ReturnType,
 }
-impl<'a> SparkMetricNode<'a> {
-    pub const SIG_TYPE: &'static str = 
"org/apache/auron/metric/SparkMetricNode";
+impl<'a> MetricNode<'a> {
+    pub const SIG_TYPE: &'static str = "org/apache/auron/metric/MetricNode";
 
-    pub fn new(env: &JNIEnv<'a>) -> JniResult<SparkMetricNode<'a>> {
+    pub fn new(env: &JNIEnv<'a>) -> JniResult<MetricNode<'a>> {
         let class = get_global_jclass(env, Self::SIG_TYPE)?;
-        Ok(SparkMetricNode {
+        Ok(MetricNode {
             class,
             method_getChild: env.get_method_id(
                 class,
@@ -1181,6 +1252,10 @@ impl<'a> AuronRssPartitionWriterBase<'_> {
             method_flush_ret: ReturnType::Primitive(Primitive::Void),
         })
     }
+
+    fn default() -> Self {
+        unsafe { std::mem::zeroed() }
+    }
 }
 
 #[allow(non_snake_case)]
@@ -1202,6 +1277,10 @@ impl<'a> SparkAuronUDFWrapperContext<'a> {
             method_eval_ret: ReturnType::Primitive(Primitive::Void),
         })
     }
+
+    fn default() -> Self {
+        unsafe { std::mem::zeroed() }
+    }
 }
 
 #[allow(non_snake_case)]
@@ -1318,6 +1397,10 @@ impl<'a> SparkUDAFWrapperContext<'a> {
             method_unspill_ret: ReturnType::Object,
         })
     }
+
+    fn default() -> Self {
+        unsafe { std::mem::zeroed() }
+    }
 }
 
 #[allow(non_snake_case)]
@@ -1347,6 +1430,10 @@ impl<'a> SparkUDTFWrapperContext<'a> {
             method_terminateLoop_ret: ReturnType::Primitive(Primitive::Void),
         })
     }
+
+    fn default() -> Self {
+        unsafe { std::mem::zeroed() }
+    }
 }
 
 #[allow(non_snake_case)]
@@ -1380,6 +1467,10 @@ impl<'a> SparkUDAFMemTracker<'a> {
             method_updateUsed_ret: ReturnType::Primitive(Primitive::Boolean),
         })
     }
+
+    fn default() -> Self {
+        unsafe { std::mem::zeroed() }
+    }
 }
 
 #[allow(non_snake_case)]
@@ -1470,6 +1561,10 @@ impl<'a> AuronOnHeapSpillManager<'a> {
             method_releaseSpill_ret: ReturnType::Primitive(Primitive::Void),
         })
     }
+
+    fn default() -> Self {
+        unsafe { std::mem::zeroed() }
+    }
 }
 
 #[allow(non_snake_case)]
@@ -1502,6 +1597,10 @@ impl<'a> AuronNativeParquetSinkUtils<'a> {
             method_completeOutput_ret: ReturnType::Primitive(Primitive::Void),
         })
     }
+
+    fn default() -> Self {
+        unsafe { std::mem::zeroed() }
+    }
 }
 
 #[allow(non_snake_case)]
@@ -1562,6 +1661,10 @@ impl<'a> AuronBlockObject<'a> {
             method_throwFetchFailed_ret: 
ReturnType::Primitive(Primitive::Void),
         })
     }
+
+    fn default() -> Self {
+        unsafe { std::mem::zeroed() }
+    }
 }
 
 #[allow(non_snake_case)]
@@ -1648,6 +1751,10 @@ impl<'a> AuronJsonFallbackWrapper<'a> {
             method_parseJsons_ret: ReturnType::Primitive(Primitive::Void),
         })
     }
+
+    fn default() -> Self {
+        unsafe { std::mem::zeroed() }
+    }
 }
 
 fn get_global_jclass(env: &JNIEnv<'_>, cls: &str) -> 
JniResult<JClass<'static>> {
diff --git a/native-engine/auron/src/metrics.rs 
b/native-engine/auron/src/metrics.rs
index e52ee4bb..30d957a3 100644
--- a/native-engine/auron/src/metrics.rs
+++ b/native-engine/auron/src/metrics.rs
@@ -19,7 +19,7 @@ use auron_jni_bridge::{jni_call, jni_new_string};
 use datafusion::{common::Result, physical_plan::ExecutionPlan};
 use jni::objects::JObject;
 
-pub fn update_spark_metric_node(
+pub fn update_metric_node(
     metric_node: JObject,
     execution_plan: Arc<dyn ExecutionPlan>,
 ) -> Result<()> {
@@ -42,9 +42,9 @@ pub fn update_spark_metric_node(
     // update children nodes
     for (i, &child_plan) in execution_plan.children().iter().enumerate() {
         let child_metric_node = jni_call!(
-            SparkMetricNode(metric_node).getChild(i as i32) -> JObject
+            MetricNode(metric_node).getChild(i as i32) -> JObject
         )?;
-        update_spark_metric_node(child_metric_node.as_obj(), 
child_plan.clone())?;
+        update_metric_node(child_metric_node.as_obj(), child_plan.clone())?;
     }
     Ok(())
 }
@@ -52,7 +52,7 @@ pub fn update_spark_metric_node(
 fn update_metrics(metric_node: JObject, metric_values: &[(&str, i64)]) -> 
Result<()> {
     for &(name, value) in metric_values {
         let jname = jni_new_string!(&name)?;
-        jni_call!(SparkMetricNode(metric_node).add(jname.as_obj(), value) -> 
())?;
+        jni_call!(MetricNode(metric_node).add(jname.as_obj(), value) -> ())?;
     }
     Ok(())
 }
diff --git a/native-engine/auron/src/rt.rs b/native-engine/auron/src/rt.rs
index 005aac1d..29a0f180 100644
--- a/native-engine/auron/src/rt.rs
+++ b/native-engine/auron/src/rt.rs
@@ -57,7 +57,7 @@ use tokio::{runtime::Runtime, task::JoinHandle};
 use crate::{
     handle_unwinded_scope,
     logging::{THREAD_PARTITION_ID, THREAD_STAGE_ID, THREAD_TID},
-    metrics::update_spark_metric_node,
+    metrics::update_metric_node,
 };
 
 pub struct NativeExecutionRuntime {
@@ -301,7 +301,7 @@ impl NativeExecutionRuntime {
         let metrics = jni_call!(
             AuronCallNativeWrapper(self.native_wrapper.as_obj()).getMetrics() 
-> JObject
         )?;
-        update_spark_metric_node(metrics.as_obj(), self.plan.clone())?;
+        update_metric_node(metrics.as_obj(), self.plan.clone())?;
         Ok(())
     }
 }
diff --git 
a/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java 
b/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java
index 005e9753..3bf45cb5 100644
--- a/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java
+++ b/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java
@@ -94,4 +94,9 @@ public class SparkAuronAdaptor extends AuronAdaptor {
     public AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer 
udfSerialized) {
         return new SparkAuronUDFWrapperContext(udfSerialized);
     }
+
+    @Override
+    public String getEngineName() {
+        return "Spark";
+    }
 }

Reply via email to