This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 05eca619 make MD5 output back to utf8 (#1605)
05eca619 is described below

commit 05eca619b1f4e2f7ee0a87b1cc2010273860c7ef
Author: Flyangz <[email protected]>
AuthorDate: Thu Nov 13 17:16:54 2025 +0800

    make MD5 output back to utf8 (#1605)
---
 native-engine/auron-serde/proto/auron.proto        |  2 +-
 native-engine/auron-serde/src/from_proto.rs        |  2 +-
 native-engine/datafusion-ext-functions/src/lib.rs  | 11 ++---
 .../src/{spark_sha2.rs => spark_crypto.rs}         | 50 ++++++++++++++++------
 .../spark/sql/auron/AuronFunctionSuite.scala       | 17 ++++++++
 .../apache/spark/sql/auron/NativeConverters.scala  |  2 +-
 6 files changed, 63 insertions(+), 21 deletions(-)

diff --git a/native-engine/auron-serde/proto/auron.proto 
b/native-engine/auron-serde/proto/auron.proto
index 5a4076f2..29432c1e 100644
--- a/native-engine/auron-serde/proto/auron.proto
+++ b/native-engine/auron-serde/proto/auron.proto
@@ -234,7 +234,7 @@ enum ScalarFunction {
   Lpad=32;
   Lower=33;
   Ltrim=34;
-  MD5=35;
+  // MD5=35;
   // NullIf=36;
   OctetLength=37;
   Random=38;
diff --git a/native-engine/auron-serde/src/from_proto.rs 
b/native-engine/auron-serde/src/from_proto.rs
index 45071360..7d051ecd 100644
--- a/native-engine/auron-serde/src/from_proto.rs
+++ b/native-engine/auron-serde/src/from_proto.rs
@@ -788,7 +788,7 @@ impl From<protobuf::ScalarFunction> for Arc<ScalarUDF> {
             ScalarFunction::Nvl => f::core::nvl(),
             ScalarFunction::DatePart => f::datetime::date_part(),
             ScalarFunction::DateTrunc => f::datetime::date_trunc(),
-            ScalarFunction::Md5 => f::crypto::md5(),
+            // ScalarFunction::Md5 => f::crypto::md5(),
             // ScalarFunction::Sha224 => f::crypto::sha224(),
             // ScalarFunction::Sha256 => f::crypto::sha256(),
             // ScalarFunction::Sha384 => f::crypto::sha384(),
diff --git a/native-engine/datafusion-ext-functions/src/lib.rs 
b/native-engine/datafusion-ext-functions/src/lib.rs
index c48c2871..0743d26c 100644
--- a/native-engine/datafusion-ext-functions/src/lib.rs
+++ b/native-engine/datafusion-ext-functions/src/lib.rs
@@ -20,6 +20,7 @@ use datafusion_ext_commons::df_unimplemented_err;
 
 mod brickhouse;
 mod spark_check_overflow;
+mod spark_crypto;
 mod spark_dates;
 pub mod spark_get_json_object;
 mod spark_hash;
@@ -28,7 +29,6 @@ mod spark_make_decimal;
 mod spark_normalize_nan_and_zero;
 mod spark_null_if;
 mod spark_round;
-mod spark_sha2;
 mod spark_strings;
 mod spark_unscaled_value;
 
@@ -45,10 +45,11 @@ pub fn create_auron_ext_function(name: &str) -> 
Result<ScalarFunctionImplementat
         "Spark_CheckOverflow" => 
Arc::new(spark_check_overflow::spark_check_overflow),
         "Spark_Murmur3Hash" => Arc::new(spark_hash::spark_murmur3_hash),
         "Spark_XxHash64" => Arc::new(spark_hash::spark_xxhash64),
-        "Spark_Sha224" => Arc::new(spark_sha2::spark_sha224),
-        "Spark_Sha256" => Arc::new(spark_sha2::spark_sha256),
-        "Spark_Sha384" => Arc::new(spark_sha2::spark_sha384),
-        "Spark_Sha512" => Arc::new(spark_sha2::spark_sha512),
+        "Sha224" => Arc::new(spark_crypto::spark_sha224),
+        "Sha256" => Arc::new(spark_crypto::spark_sha256),
+        "Sha384" => Arc::new(spark_crypto::spark_sha384),
+        "Sha512" => Arc::new(spark_crypto::spark_sha512),
+        "MD5" => Arc::new(spark_crypto::spark_md5),
         "Spark_GetJsonObject" => 
Arc::new(spark_get_json_object::spark_get_json_object),
         "Spark_GetParsedJsonObject" => {
             Arc::new(spark_get_json_object::spark_get_parsed_json_object)
diff --git a/native-engine/datafusion-ext-functions/src/spark_sha2.rs 
b/native-engine/datafusion-ext-functions/src/spark_crypto.rs
similarity index 80%
rename from native-engine/datafusion-ext-functions/src/spark_sha2.rs
rename to native-engine/datafusion-ext-functions/src/spark_crypto.rs
index 741c1b3e..fe6dd184 100644
--- a/native-engine/datafusion-ext-functions/src/spark_sha2.rs
+++ b/native-engine/datafusion-ext-functions/src/spark_crypto.rs
@@ -20,8 +20,11 @@ use arrow::{
     datatypes::{DataType, Field},
 };
 use datafusion::{
-    common::{Result, ScalarValue, cast::as_binary_array},
-    functions::crypto::{sha224, sha256, sha384, sha512},
+    common::{Result, ScalarValue, cast::as_binary_array, 
utils::take_function_args},
+    functions::crypto::{
+        basic::{DigestAlgorithm, digest_process},
+        sha224, sha256, sha384, sha512,
+    },
     logical_expr::{ScalarFunctionArgs, ScalarUDF},
     physical_plan::ColumnarValue,
 };
@@ -30,39 +33,46 @@ use datafusion_ext_commons::df_execution_err;
 /// `sha224` function that simulates Spark's `sha2` expression with bit width
 /// 224
 pub fn spark_sha224(args: &[ColumnarValue]) -> Result<ColumnarValue> {
-    wrap_digest_result_as_hex_string(args, sha224())
+    digest_and_wrap_as_hex(args, sha224())
 }
 
 /// `sha256` function that simulates Spark's `sha2` expression with bit width 0
 /// or 256
 pub fn spark_sha256(args: &[ColumnarValue]) -> Result<ColumnarValue> {
-    wrap_digest_result_as_hex_string(args, sha256())
+    digest_and_wrap_as_hex(args, sha256())
 }
 
 /// `sha384` function that simulates Spark's `sha2` expression with bit width
 /// 384
 pub fn spark_sha384(args: &[ColumnarValue]) -> Result<ColumnarValue> {
-    wrap_digest_result_as_hex_string(args, sha384())
+    digest_and_wrap_as_hex(args, sha384())
 }
 
 /// `sha512` function that simulates Spark's `sha2` expression with bit width
 /// 512
 pub fn spark_sha512(args: &[ColumnarValue]) -> Result<ColumnarValue> {
-    wrap_digest_result_as_hex_string(args, sha512())
+    digest_and_wrap_as_hex(args, sha512())
 }
 
-/// Spark requires hex string as the result of sha2 functions, we have to wrap
-/// the result of digest functions as hex string
-fn wrap_digest_result_as_hex_string(
-    args: &[ColumnarValue],
-    digest: Arc<ScalarUDF>,
-) -> Result<ColumnarValue> {
+pub fn spark_md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    let [data] = take_function_args("md5", args)?;
+    let value = digest_process(data, DigestAlgorithm::Md5)?;
+    to_hex_string(value)
+}
+
+/// Spark requires hex string as the result of sha2 and md5 functions, we have
+/// to wrap the result of digest functions as hex string
+fn digest_and_wrap_as_hex(args: &[ColumnarValue], digest: Arc<ScalarUDF>) -> 
Result<ColumnarValue> {
     let value = digest.inner().invoke_with_args(ScalarFunctionArgs {
         args: args.to_vec(),
         arg_fields: vec![Arc::new(Field::new("arg", DataType::Binary, true))],
         number_rows: 0,
         return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
     })?;
+    to_hex_string(value)
+}
+
+fn to_hex_string(value: ColumnarValue) -> Result<ColumnarValue> {
     Ok(match value {
         ColumnarValue::Array(array) => {
             let binary_array = as_binary_array(&array)?;
@@ -102,7 +112,7 @@ mod tests {
         common::ScalarValue, error::Result as DataFusionResult, 
physical_plan::ColumnarValue,
     };
 
-    use crate::spark_sha2::{spark_sha224, spark_sha256, spark_sha384, 
spark_sha512};
+    use crate::spark_crypto::{spark_md5, spark_sha224, spark_sha256, 
spark_sha384, spark_sha512};
 
     /// Helper function to run a test for a given hash function and scalar
     /// input.
@@ -181,4 +191,18 @@ mod tests {
         let expected = 
"178d767c364244ede054ebb3cc4af0ac2b307a86fba6a32706ce4f692642674d2ab8f51ee738ecb09bc296918aa85db48abe28fcaef7aa2da81a618cc6d891c3";
         run_scalar_test(spark_sha512, input, expected)
     }
+
+    #[test]
+    fn test_md5_scalar_utf8() -> Result<(), Box<dyn Error>> {
+        let input = 
ColumnarValue::Scalar(ScalarValue::Utf8(Some("ABC".to_string())));
+        let expected = "902fbdd2b1df0c4f70b4a5d23525e932";
+        run_scalar_test(spark_md5, input, expected)
+    }
+
+    #[test]
+    fn test_md5_scalar_binary() -> Result<(), Box<dyn Error>> {
+        let input = ColumnarValue::Scalar(ScalarValue::Binary(Some(vec![1, 2, 
3, 4, 5, 6])));
+        let expected = "6ac1e56bc78f031059be7be854522c4c";
+        run_scalar_test(spark_md5, input, expected)
+    }
 }
diff --git 
a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala
 
b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala
index 99348978..5e09dc78 100644
--- 
a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala
+++ 
b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala
@@ -64,6 +64,23 @@ class AuronFunctionSuite
     }
   }
 
+  test("md5 function") {
+    withTable("t1") {
+      sql("create table t1 using parquet as select 'spark' as c1, '3.x' as 
version")
+      val functions =
+        """
+          |select b.md5
+          |from (
+          |  select c1, version from t1
+          |) a join (
+          |  select md5(concat(c1, version)) as md5 from t1
+          |) b on md5(concat(a.c1, a.version)) = b.md5
+          |""".stripMargin
+      val df = sql(functions)
+      checkAnswer(df, Seq(Row("9ff36a3857e29335d03cf6bef2147119")))
+    }
+  }
+
   test("spark hash function") {
     withTable("t1") {
       sql("create table t1 using parquet as select array(1, 2) as arr")
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
index 44643801..d98121a9 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
@@ -864,7 +864,7 @@ object NativeConverters extends Logging {
       case e @ NullIf(left, right, _) =>
         buildExtScalarFunction("Spark_NullIf", left :: right :: Nil, 
e.dataType)
       case Md5(_1) =>
-        buildScalarFunction(pb.ScalarFunction.MD5, 
Seq(unpackBinaryTypeCast(_1)), StringType)
+        buildExtScalarFunction("MD5", Seq(unpackBinaryTypeCast(_1)), 
StringType)
       case Reverse(_1) =>
         buildScalarFunction(pb.ScalarFunction.Reverse, 
Seq(unpackBinaryTypeCast(_1)), StringType)
       case InitCap(_1) =>

Reply via email to