mbutrovich commented on code in PR #4763:
URL: https://github.com/apache/datafusion-comet/pull/4763#discussion_r3499756808


##########
native/spark-expr/src/conversion_funcs/cast.rs:
##########


Review Comment:
   This looks like the same `UTF8String.fromBytes` operation as the cast path 
you just fixed, one formatter over. As far as I can tell it is reachable in 
production: with `spark.sql.binaryOutputStyle=UTF8` (Spark 4.0+), a 
`ToPrettyString` / `show()` over a binary column whose bytes are not valid 
UTF-8 would land here and hit the `unwrap()`, which panics the executor rather 
than rendering like Spark. Spark's `ToStringBase` UTF8 `BinaryFormatter` goes 
through the same `fromBytes` plus replacement, so I think it has the same 
`U+FFFD` semantics as the plain cast.
   
   Since `decode_utf8_spark_lossy` is now shared, would it make sense to route 
this arm through it as well so both binary-to-string formatters behave the same?



##########
native/spark-expr/src/utils.rs:
##########
@@ -373,6 +374,123 @@ pub fn unlikely(b: bool) -> bool {
     b
 }
 
+/// Decode `bytes` as UTF-8 the way Spark renders `StringType` -- `new 
String(bytes, UTF_8)` on the
+/// JVM -- replacing each ill-formed sequence with a single `U+FFFD` and 
skipping the same number of
+/// bytes the JDK's UTF-8 `CharsetDecoder` (action REPLACE) would. Valid UTF-8 
is returned as a
+/// zero-cost borrow.
+///
+/// This intentionally differs from `str::from_utf8_lossy` for surrogate-range 
three-byte sequences
+/// (`ED A0..BF ..`, e.g. CESU-8 / Java modified-UTF-8 supplementary chars) 
and for some other
+/// ill-formed multi-byte units: `from_utf8_lossy` follows the Unicode 
"maximal subpart" rule and
+/// can emit one `U+FFFD` per byte, whereas the JDK collapses certain 
ill-formed units into a single
+/// `U+FFFD`. Matching the JDK byte-for-byte means Comet renders arbitrary 
bytes identically to
+/// Spark -- whether they arrive via a columnar shuffle or a `CAST(binary AS 
string)`. The
+/// per-class malformed lengths below (E0/ED overlong & surrogate handling, 
F0/F4 range checks)
+/// match the observable replacement behavior of the JDK UTF-8 decoder; they 
were determined from
+/// observed `new String(bytes, UTF_8)` output, not by reviewing the OpenJDK 
source.
+pub fn decode_utf8_spark_lossy(bytes: &[u8]) -> Cow<'_, str> {
+    // Fast path: well-formed UTF-8 borrows with zero copy (the overwhelmingly 
common case).
+    if let Ok(s) = std::str::from_utf8(bytes) {
+        return Cow::Borrowed(s);
+    }
+
+    const RC: char = '\u{FFFD}';
+    let n = bytes.len();
+    let mut out = String::with_capacity(n);
+    let mut i = 0;
+    while i < n {

Review Comment:
   The decoder is correct and the branch guards all earn their place (they also 
double as the proof that the three `char::from_u32(cp).unwrap()` calls cannot 
fire). Two tiny idiomatic touches:
   
   - The bounds checks use `i + 1 >= n` followed by raw `bytes[i + 1]` 
indexing. Using `bytes.get(i + 1)` folds the bounds check into the access and 
reads a bit more like idiomatic Rust.
   - The `char::from_u32(cp).unwrap()` calls are provably safe given the guards 
above them. A one-line `// guards above keep cp a valid scalar` would save the 
next reader from re-deriving that, or `char::from_u32_unchecked` with a SAFETY 
note if you want to shave the check.



##########
native/spark-expr/src/conversion_funcs/cast.rs:
##########
@@ -854,19 +856,51 @@ fn spark_binary_formatter(value: &[u8], 
binary_output_style: BinaryOutputStyle)
 }
 
 fn cast_binary_formatter(value: &[u8]) -> String {
-    match String::from_utf8(value.to_vec()) {
-        Ok(value) => value,
-        Err(_) => unsafe { String::from_utf8_unchecked(value.to_vec()) },
-    }
+    // CAST(binary AS string) reinterprets the bytes as UTF-8, like Spark's 
UTF8String.fromBytes.
+    // Spark keeps the raw bytes, but Arrow's Utf8 type requires valid UTF-8, 
and building a String
+    // from non-UTF-8 bytes is undefined behaviour (#4488). Decode 
JVM-compatibly-lossily instead:
+    // `decode_utf8_spark_lossy` replaces ill-formed sequences with U+FFFD 
exactly as Spark's
+    // `new String(bytes, UTF_8)` does (the same decoder Comet's native 
shuffle uses, #4521). The
+    // result is memory-safe valid UTF-8, never feeds invalid bytes into 
downstream native string
+    // kernels, and matches Spark's rendered output byte-for-byte. It diverges 
from Spark only under
+    // byte-level round-trips such as CAST(CAST(x AS string) AS binary), where 
Spark still has the
+    // original bytes and Comet has the U+FFFD replacements.
+    decode_utf8_spark_lossy(value).into_owned()

Review Comment:
   Small performance thought: in the common all-valid-UTF-8 case the decoder 
hands back a borrowed `Cow`, then `into_owned()` allocates a fresh `String` and 
copies the bytes in, and the `collect::<GenericStringArray>()` in 
`cast_binary_to_string` immediately copies those same bytes a second time into 
the Arrow value buffer before dropping the `String`. That is one malloc plus 
one free per non-null row that does not buy anything. It is not a regression, 
the old code allocated too, but since this function is already being reworked 
it might be a good moment to capture the win.
   
   One option that matches what `cast_array_to_string` already does at lines 
641 and 703 is to build with a `GenericStringBuilder<O>` and append the `&str` 
straight from the `Cow`, so the valid path copies once instead of twice. 
Roughly:
   
   ```rust
   fn cast_binary_to_string<O: OffsetSizeTrait>(
       array: &dyn Array,
       spark_cast_options: &SparkCastOptions,
   ) -> Result<ArrayRef, ArrowError> {
       let input = array
           .as_any()
           .downcast_ref::<GenericByteArray<GenericBinaryType<O>>>()
           .unwrap();
   
       let mut builder = GenericStringBuilder::<O>::new();
       for value in input.iter() {
           match value {
               Some(bytes) => match spark_cast_options.binary_output_style {
                   Some(s) => 
builder.append_value(spark_binary_formatter(bytes, s)),
                   None => 
builder.append_value(decode_utf8_spark_lossy(bytes).as_ref()),
               },
               None => builder.append_null(),
           }
       }
       Ok(Arc::new(builder.finish()))
   }
   ```
   
   That keeps the `ToPrettyString` styles exactly as they are (they still build 
owned strings, which is unavoidable) and only removes the throwaway allocation 
on the hot cast path.



##########
native/spark-expr/src/utils.rs:
##########
@@ -373,6 +374,123 @@ pub fn unlikely(b: bool) -> bool {
     b
 }
 
+/// Decode `bytes` as UTF-8 the way Spark renders `StringType` -- `new 
String(bytes, UTF_8)` on the
+/// JVM -- replacing each ill-formed sequence with a single `U+FFFD` and 
skipping the same number of
+/// bytes the JDK's UTF-8 `CharsetDecoder` (action REPLACE) would. Valid UTF-8 
is returned as a
+/// zero-cost borrow.
+///
+/// This intentionally differs from `str::from_utf8_lossy` for surrogate-range 
three-byte sequences
+/// (`ED A0..BF ..`, e.g. CESU-8 / Java modified-UTF-8 supplementary chars) 
and for some other
+/// ill-formed multi-byte units: `from_utf8_lossy` follows the Unicode 
"maximal subpart" rule and
+/// can emit one `U+FFFD` per byte, whereas the JDK collapses certain 
ill-formed units into a single
+/// `U+FFFD`. Matching the JDK byte-for-byte means Comet renders arbitrary 
bytes identically to
+/// Spark -- whether they arrive via a columnar shuffle or a `CAST(binary AS 
string)`. The
+/// per-class malformed lengths below (E0/ED overlong & surrogate handling, 
F0/F4 range checks)
+/// match the observable replacement behavior of the JDK UTF-8 decoder; they 
were determined from
+/// observed `new String(bytes, UTF_8)` output, not by reviewing the OpenJDK 
source.
+pub fn decode_utf8_spark_lossy(bytes: &[u8]) -> Cow<'_, str> {

Review Comment:
   nit: `decode_utf8_spark_lossy` is a pure byte-to-string primitive with no 
expression dependencies, and its sibling JVM-bytes helper `bytes_to_i128` 
already lives in `datafusion-comet-common`, which both the shuffle and 
`spark-expr` already depend on. Putting the decoder there too would avoid the 
shuffle crate reaching into the expression crate just for this, and would keep 
the two "render arbitrary JVM bytes" helpers side by side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to