This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 05eca619 make MD5 output back to utf8 (#1605)
05eca619 is described below
commit 05eca619b1f4e2f7ee0a87b1cc2010273860c7ef
Author: Flyangz <[email protected]>
AuthorDate: Thu Nov 13 17:16:54 2025 +0800
make MD5 output back to utf8 (#1605)
---
native-engine/auron-serde/proto/auron.proto | 2 +-
native-engine/auron-serde/src/from_proto.rs | 2 +-
native-engine/datafusion-ext-functions/src/lib.rs | 11 ++---
.../src/{spark_sha2.rs => spark_crypto.rs} | 50 ++++++++++++++++------
.../spark/sql/auron/AuronFunctionSuite.scala | 17 ++++++++
.../apache/spark/sql/auron/NativeConverters.scala | 2 +-
6 files changed, 63 insertions(+), 21 deletions(-)
diff --git a/native-engine/auron-serde/proto/auron.proto
b/native-engine/auron-serde/proto/auron.proto
index 5a4076f2..29432c1e 100644
--- a/native-engine/auron-serde/proto/auron.proto
+++ b/native-engine/auron-serde/proto/auron.proto
@@ -234,7 +234,7 @@ enum ScalarFunction {
Lpad=32;
Lower=33;
Ltrim=34;
- MD5=35;
+ // MD5=35;
// NullIf=36;
OctetLength=37;
Random=38;
diff --git a/native-engine/auron-serde/src/from_proto.rs
b/native-engine/auron-serde/src/from_proto.rs
index 45071360..7d051ecd 100644
--- a/native-engine/auron-serde/src/from_proto.rs
+++ b/native-engine/auron-serde/src/from_proto.rs
@@ -788,7 +788,7 @@ impl From<protobuf::ScalarFunction> for Arc<ScalarUDF> {
ScalarFunction::Nvl => f::core::nvl(),
ScalarFunction::DatePart => f::datetime::date_part(),
ScalarFunction::DateTrunc => f::datetime::date_trunc(),
- ScalarFunction::Md5 => f::crypto::md5(),
+ // ScalarFunction::Md5 => f::crypto::md5(),
// ScalarFunction::Sha224 => f::crypto::sha224(),
// ScalarFunction::Sha256 => f::crypto::sha256(),
// ScalarFunction::Sha384 => f::crypto::sha384(),
diff --git a/native-engine/datafusion-ext-functions/src/lib.rs
b/native-engine/datafusion-ext-functions/src/lib.rs
index c48c2871..0743d26c 100644
--- a/native-engine/datafusion-ext-functions/src/lib.rs
+++ b/native-engine/datafusion-ext-functions/src/lib.rs
@@ -20,6 +20,7 @@ use datafusion_ext_commons::df_unimplemented_err;
mod brickhouse;
mod spark_check_overflow;
+mod spark_crypto;
mod spark_dates;
pub mod spark_get_json_object;
mod spark_hash;
@@ -28,7 +29,6 @@ mod spark_make_decimal;
mod spark_normalize_nan_and_zero;
mod spark_null_if;
mod spark_round;
-mod spark_sha2;
mod spark_strings;
mod spark_unscaled_value;
@@ -45,10 +45,11 @@ pub fn create_auron_ext_function(name: &str) ->
Result<ScalarFunctionImplementat
"Spark_CheckOverflow" =>
Arc::new(spark_check_overflow::spark_check_overflow),
"Spark_Murmur3Hash" => Arc::new(spark_hash::spark_murmur3_hash),
"Spark_XxHash64" => Arc::new(spark_hash::spark_xxhash64),
- "Spark_Sha224" => Arc::new(spark_sha2::spark_sha224),
- "Spark_Sha256" => Arc::new(spark_sha2::spark_sha256),
- "Spark_Sha384" => Arc::new(spark_sha2::spark_sha384),
- "Spark_Sha512" => Arc::new(spark_sha2::spark_sha512),
+ "Sha224" => Arc::new(spark_crypto::spark_sha224),
+ "Sha256" => Arc::new(spark_crypto::spark_sha256),
+ "Sha384" => Arc::new(spark_crypto::spark_sha384),
+ "Sha512" => Arc::new(spark_crypto::spark_sha512),
+ "MD5" => Arc::new(spark_crypto::spark_md5),
"Spark_GetJsonObject" =>
Arc::new(spark_get_json_object::spark_get_json_object),
"Spark_GetParsedJsonObject" => {
Arc::new(spark_get_json_object::spark_get_parsed_json_object)
diff --git a/native-engine/datafusion-ext-functions/src/spark_sha2.rs
b/native-engine/datafusion-ext-functions/src/spark_crypto.rs
similarity index 80%
rename from native-engine/datafusion-ext-functions/src/spark_sha2.rs
rename to native-engine/datafusion-ext-functions/src/spark_crypto.rs
index 741c1b3e..fe6dd184 100644
--- a/native-engine/datafusion-ext-functions/src/spark_sha2.rs
+++ b/native-engine/datafusion-ext-functions/src/spark_crypto.rs
@@ -20,8 +20,11 @@ use arrow::{
datatypes::{DataType, Field},
};
use datafusion::{
- common::{Result, ScalarValue, cast::as_binary_array},
- functions::crypto::{sha224, sha256, sha384, sha512},
+ common::{Result, ScalarValue, cast::as_binary_array,
utils::take_function_args},
+ functions::crypto::{
+ basic::{DigestAlgorithm, digest_process},
+ sha224, sha256, sha384, sha512,
+ },
logical_expr::{ScalarFunctionArgs, ScalarUDF},
physical_plan::ColumnarValue,
};
@@ -30,39 +33,46 @@ use datafusion_ext_commons::df_execution_err;
/// `sha224` function that simulates Spark's `sha2` expression with bit width
/// 224
pub fn spark_sha224(args: &[ColumnarValue]) -> Result<ColumnarValue> {
- wrap_digest_result_as_hex_string(args, sha224())
+ digest_and_wrap_as_hex(args, sha224())
}
/// `sha256` function that simulates Spark's `sha2` expression with bit width 0
/// or 256
pub fn spark_sha256(args: &[ColumnarValue]) -> Result<ColumnarValue> {
- wrap_digest_result_as_hex_string(args, sha256())
+ digest_and_wrap_as_hex(args, sha256())
}
/// `sha384` function that simulates Spark's `sha2` expression with bit width
/// 384
pub fn spark_sha384(args: &[ColumnarValue]) -> Result<ColumnarValue> {
- wrap_digest_result_as_hex_string(args, sha384())
+ digest_and_wrap_as_hex(args, sha384())
}
/// `sha512` function that simulates Spark's `sha2` expression with bit width
/// 512
pub fn spark_sha512(args: &[ColumnarValue]) -> Result<ColumnarValue> {
- wrap_digest_result_as_hex_string(args, sha512())
+ digest_and_wrap_as_hex(args, sha512())
}
-/// Spark requires hex string as the result of sha2 functions, we have to wrap
-/// the result of digest functions as hex string
-fn wrap_digest_result_as_hex_string(
- args: &[ColumnarValue],
- digest: Arc<ScalarUDF>,
-) -> Result<ColumnarValue> {
+pub fn spark_md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ let [data] = take_function_args("md5", args)?;
+ let value = digest_process(data, DigestAlgorithm::Md5)?;
+ to_hex_string(value)
+}
+
+/// Spark requires hex string as the result of sha2 and md5 functions, we have
+/// to wrap the result of digest functions as hex string
+fn digest_and_wrap_as_hex(args: &[ColumnarValue], digest: Arc<ScalarUDF>) ->
Result<ColumnarValue> {
let value = digest.inner().invoke_with_args(ScalarFunctionArgs {
args: args.to_vec(),
arg_fields: vec![Arc::new(Field::new("arg", DataType::Binary, true))],
number_rows: 0,
return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
})?;
+ to_hex_string(value)
+}
+
+fn to_hex_string(value: ColumnarValue) -> Result<ColumnarValue> {
Ok(match value {
ColumnarValue::Array(array) => {
let binary_array = as_binary_array(&array)?;
@@ -102,7 +112,7 @@ mod tests {
common::ScalarValue, error::Result as DataFusionResult,
physical_plan::ColumnarValue,
};
- use crate::spark_sha2::{spark_sha224, spark_sha256, spark_sha384,
spark_sha512};
+ use crate::spark_crypto::{spark_md5, spark_sha224, spark_sha256,
spark_sha384, spark_sha512};
/// Helper function to run a test for a given hash function and scalar
/// input.
@@ -181,4 +191,18 @@ mod tests {
let expected =
"178d767c364244ede054ebb3cc4af0ac2b307a86fba6a32706ce4f692642674d2ab8f51ee738ecb09bc296918aa85db48abe28fcaef7aa2da81a618cc6d891c3";
run_scalar_test(spark_sha512, input, expected)
}
+
+ #[test]
+ fn test_md5_scalar_utf8() -> Result<(), Box<dyn Error>> {
+ let input =
ColumnarValue::Scalar(ScalarValue::Utf8(Some("ABC".to_string())));
+ let expected = "902fbdd2b1df0c4f70b4a5d23525e932";
+ run_scalar_test(spark_md5, input, expected)
+ }
+
+ #[test]
+ fn test_md5_scalar_binary() -> Result<(), Box<dyn Error>> {
+ let input = ColumnarValue::Scalar(ScalarValue::Binary(Some(vec![1, 2,
3, 4, 5, 6])));
+ let expected = "6ac1e56bc78f031059be7be854522c4c";
+ run_scalar_test(spark_md5, input, expected)
+ }
}
diff --git
a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala
b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala
index 99348978..5e09dc78 100644
---
a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala
+++
b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala
@@ -64,6 +64,23 @@ class AuronFunctionSuite
}
}
+ test("md5 function") {
+ withTable("t1") {
+ sql("create table t1 using parquet as select 'spark' as c1, '3.x' as
version")
+ val functions =
+ """
+ |select b.md5
+ |from (
+ | select c1, version from t1
+ |) a join (
+ | select md5(concat(c1, version)) as md5 from t1
+ |) b on md5(concat(a.c1, a.version)) = b.md5
+ |""".stripMargin
+ val df = sql(functions)
+ checkAnswer(df, Seq(Row("9ff36a3857e29335d03cf6bef2147119")))
+ }
+ }
+
test("spark hash function") {
withTable("t1") {
sql("create table t1 using parquet as select array(1, 2) as arr")
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
index 44643801..d98121a9 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
@@ -864,7 +864,7 @@ object NativeConverters extends Logging {
case e @ NullIf(left, right, _) =>
buildExtScalarFunction("Spark_NullIf", left :: right :: Nil,
e.dataType)
case Md5(_1) =>
- buildScalarFunction(pb.ScalarFunction.MD5,
Seq(unpackBinaryTypeCast(_1)), StringType)
+ buildExtScalarFunction("MD5", Seq(unpackBinaryTypeCast(_1)),
StringType)
case Reverse(_1) =>
buildScalarFunction(pb.ScalarFunction.Reverse,
Seq(unpackBinaryTypeCast(_1)), StringType)
case InitCap(_1) =>