[ 
https://issues.apache.org/jira/browse/SPARK-55787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joaquín Antonio De Vicente López updated SPARK-55787:
-----------------------------------------------------
    Description: 
h3. Problem

Spark currently lacks a built-in way to determine whether a struct contains 
only NULL values, without explicitly checking every field or performing 
expensive serialization.

In Structured Streaming pipelines using Kafka + Avro (via ABRiS), when 
deserialization fails and {{PermissiveRecordExceptionHandler}} is used, the 
handler produces a struct that is *non-null but has ALL
fields set to null*:
{code:scala}
  class PermissiveRecordExceptionHandler() extends 
DeserializationExceptionHandler with Logging {
    def handle(exception: Throwable, deserializer: AbrisAvroDeserializer, 
readerSchema: Schema): Any = {
      logWarning("Malformed record detected. Replacing with full null row.", 
exception)
      val record = new GenericData.Record(readerSchema)
      deserializer.deserialize(record)
    }
  }  {code}
Currently, the only way to detect these "empty" structs is:
{code:scala}
  df.filter(col("value").isNotNull && to_json(col("value")) =!= "{}"){code}
This is problematic because:
 * {{to_json}} forces *full JSON serialization* of every row (Jackson 
allocation + byte array + UTF8String)
 * It is *not type-aware* — it compares against a magic string {{"{}"}}
 * It creates significant GC pressure in high-throughput streaming (100K+ 
msg/sec)
 * It breaks whole-stage codegen ({{{}StructsToJson{}}} uses 
{{RuntimeReplaceable}} → falls back to {{{}Invoke{}}})

In streaming ingestion pipelines, if these all-null structs are not filtered 
out, they may propagate downstream to sinks such as Apache Kudu. When struct 
fields correspond to primary key columns, this can cause runtime failures due 
to NULL values in primary keys, aborting the streaming query. Preventing such 
rows in an efficient and schema-agnostic way is therefore not only a 
performance concern but also a correctness and reliability requirement.

This functionality is currently not expressible in a schema-agnostic and 
optimizer-friendly way using existing built-in functions.
h3. Proposed Solution

Add two new built-in SQL functions:
{code:sql}
  is_struct_empty(struct)     -- TRUE if non-null struct has all null fields
  is_struct_non_empty(struct) -- TRUE if non-null struct has at least one 
non-null field{code}
*Usage:*
{code:scala}
  // New — zero-cost null-bitmap check
  df.filter(col("value").isNotNull && is_struct_non_empty(col("value")))

  // SQL syntax
  SELECT * FROM kafka_messages WHERE value IS NOT NULL AND 
is_struct_non_empty(value)  {code}
h3. Semantics
||Input State||is_struct_empty||is_struct_non_empty||
|NULL struct|NULL|NULL|
|Non-null, all fields null|TRUE|FALSE|
|Non-null, at least one field non-null|FALSE|TRUE|
|Non-null, zero fields (empty schema)|TRUE|FALSE|
 * Follows SQL three-valued logic: NULL input → NULL output
 * Shallow check only: a nested struct that is itself non-null (even if its 
children are all null) counts as non-null at the parent level
 * Non-null empty arrays/maps inside the struct count as non-null (they are 
values, not null)

h3. Performance Comparison
||Dimension||to_json =!= "{}"||is_struct_non_empty||
|Object allocation|JacksonGenerator, StringWriter, JsonGenerator, UTF8String, 
byte[] per row|*Zero*|
|Serialization|Full JSON of ALL fields|*None* — bitwise null-check only|
|Short-circuit|No — must serialize entire struct|Yes — stops at first non-null 
field|
|CPU cost|O(fields × type_dispatch + string_compare)|O(1) best case, O(fields) 
worst case|
|GC pressure|High|*None*|
|Codegen|Falls back to Invoke (no tight codegen)|Full whole-stage codegen, 
inlined null checks|
h3. References
 * ABRiS (Avro Bridge for Spark): [https://github.com/AbsaOSS/ABRiS]
 * Similar concepts in other engines:
 ** BigQuery: struct comparison semantics handle NULL fields natively
 ** DuckDB: struct null-awareness in predicates

  was:
h3. Problem

Spark currently lacks a built-in way to determine whether a struct contains 
only NULL values, without explicitly checking every field or performing 
expensive serialization.

In Structured Streaming pipelines using Kafka + Avro (via ABRiS), when 
deserialization fails and {{PermissiveRecordExceptionHandler}} is used, the 
handler produces a struct that is *non-null but has ALL
fields set to null*:
{code:scala}
  class PermissiveRecordExceptionHandler() extends 
DeserializationExceptionHandler with Logging {
    def handle(exception: Throwable, deserializer: AbrisAvroDeserializer, 
readerSchema: Schema): Any = {
      logWarning("Malformed record detected. Replacing with full null row.", 
exception)
      val record = new GenericData.Record(readerSchema)
      deserializer.deserialize(record)
    }
  }  {code}
Currently, the only way to detect these "empty" structs is:
{code:scala}
  df.filter(col("value").isNotNull && to_json(col("value")) =!= "{}"){code}
This is problematic because:
 * {{to_json}} forces *full JSON serialization* of every row (Jackson 
allocation + byte array + UTF8String)
 * It is *not type-aware* — it compares against a magic string {{"{}"}}
 * It creates significant GC pressure in high-throughput streaming (100K+ 
msg/sec)
 * It breaks whole-stage codegen ({{{}StructsToJson{}}} uses 
{{RuntimeReplaceable}} → falls back to {{{}Invoke{}}})

h3. Proposed Solution

Add two new built-in SQL functions:
{code:sql}
  is_struct_empty(struct)     -- TRUE if non-null struct has all null fields
  is_struct_non_empty(struct) -- TRUE if non-null struct has at least one 
non-null field{code}
*Usage:*
{code:scala}
  // New — zero-cost null-bitmap check
  df.filter(col("value").isNotNull && is_struct_non_empty(col("value")))

  // SQL syntax
  SELECT * FROM kafka_messages WHERE value IS NOT NULL AND 
is_struct_non_empty(value)  {code}
h3. Semantics
||Input State||is_struct_empty||is_struct_non_empty||
|NULL struct|NULL|NULL|
|Non-null, all fields null|TRUE|FALSE|
|Non-null, at least one field non-null|FALSE|TRUE|
|Non-null, zero fields (empty schema)|TRUE|FALSE|
 * Follows SQL three-valued logic: NULL input → NULL output
 * Shallow check only: a nested struct that is itself non-null (even if its 
children are all null) counts as non-null at the parent level
 * Non-null empty arrays/maps inside the struct count as non-null (they are 
values, not null)

h3. Performance Comparison
||Dimension||to_json =!= "{}"||is_struct_non_empty||
|Object allocation|JacksonGenerator, StringWriter, JsonGenerator, UTF8String, 
byte[] per row|*Zero*|
|Serialization|Full JSON of ALL fields|*None* — bitwise null-check only|
|Short-circuit|No — must serialize entire struct|Yes — stops at first non-null 
field|
|CPU cost|O(fields × type_dispatch + string_compare)|O(1) best case, O(fields) 
worst case|
|GC pressure|High|*None*|
|Codegen|Falls back to Invoke (no tight codegen)|Full whole-stage codegen, 
inlined null checks|
h3. References
 * ABRiS (Avro Bridge for Spark): [https://github.com/AbsaOSS/ABRiS]
 * Similar concepts in other engines:
 ** BigQuery: struct comparison semantics handle NULL fields natively
 ** DuckDB: struct null-awareness in predicates


> Add built-in function to detect all-null structs
> ------------------------------------------------
>
>                 Key: SPARK-55787
>                 URL: https://issues.apache.org/jira/browse/SPARK-55787
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 4.1.1
>            Reporter: Joaquín Antonio De Vicente López
>            Priority: Major
>
> h3. Problem
> Spark currently lacks a built-in way to determine whether a struct contains 
> only NULL values, without explicitly checking every field or performing 
> expensive serialization.
> In Structured Streaming pipelines using Kafka + Avro (via ABRiS), when 
> deserialization fails and {{PermissiveRecordExceptionHandler}} is used, the 
> handler produces a struct that is *non-null but has ALL
> fields set to null*:
> {code:scala}
>   class PermissiveRecordExceptionHandler() extends 
> DeserializationExceptionHandler with Logging {
>     def handle(exception: Throwable, deserializer: AbrisAvroDeserializer, 
> readerSchema: Schema): Any = {
>       logWarning("Malformed record detected. Replacing with full null row.", 
> exception)
>       val record = new GenericData.Record(readerSchema)
>       deserializer.deserialize(record)
>     }
>   }  {code}
> Currently, the only way to detect these "empty" structs is:
> {code:scala}
>   df.filter(col("value").isNotNull && to_json(col("value")) =!= "{}"){code}
> This is problematic because:
>  * {{to_json}} forces *full JSON serialization* of every row (Jackson 
> allocation + byte array + UTF8String)
>  * It is *not type-aware* — it compares against a magic string {{"{}"}}
>  * It creates significant GC pressure in high-throughput streaming (100K+ 
> msg/sec)
>  * It breaks whole-stage codegen ({{{}StructsToJson{}}} uses 
> {{RuntimeReplaceable}} → falls back to {{{}Invoke{}}})
> In streaming ingestion pipelines, if these all-null structs are not filtered 
> out, they may propagate downstream to sinks such as Apache Kudu. When struct 
> fields correspond to primary key columns, this can cause runtime failures due 
> to NULL values in primary keys, aborting the streaming query. Preventing such 
> rows in an efficient and schema-agnostic way is therefore not only a 
> performance concern but also a correctness and reliability requirement.
> This functionality is currently not expressible in a schema-agnostic and 
> optimizer-friendly way using existing built-in functions.
> h3. Proposed Solution
> Add two new built-in SQL functions:
> {code:sql}
>   is_struct_empty(struct)     -- TRUE if non-null struct has all null fields
>   is_struct_non_empty(struct) -- TRUE if non-null struct has at least one 
> non-null field{code}
> *Usage:*
> {code:scala}
>   // New — zero-cost null-bitmap check
>   df.filter(col("value").isNotNull && is_struct_non_empty(col("value")))
>   // SQL syntax
>   SELECT * FROM kafka_messages WHERE value IS NOT NULL AND 
> is_struct_non_empty(value)  {code}
> h3. Semantics
> ||Input State||is_struct_empty||is_struct_non_empty||
> |NULL struct|NULL|NULL|
> |Non-null, all fields null|TRUE|FALSE|
> |Non-null, at least one field non-null|FALSE|TRUE|
> |Non-null, zero fields (empty schema)|TRUE|FALSE|
>  * Follows SQL three-valued logic: NULL input → NULL output
>  * Shallow check only: a nested struct that is itself non-null (even if its 
> children are all null) counts as non-null at the parent level
>  * Non-null empty arrays/maps inside the struct count as non-null (they are 
> values, not null)
> h3. Performance Comparison
> ||Dimension||to_json =!= "{}"||is_struct_non_empty||
> |Object allocation|JacksonGenerator, StringWriter, JsonGenerator, UTF8String, 
> byte[] per row|*Zero*|
> |Serialization|Full JSON of ALL fields|*None* — bitwise null-check only|
> |Short-circuit|No — must serialize entire struct|Yes — stops at first 
> non-null field|
> |CPU cost|O(fields × type_dispatch + string_compare)|O(1) best case, 
> O(fields) worst case|
> |GC pressure|High|*None*|
> |Codegen|Falls back to Invoke (no tight codegen)|Full whole-stage codegen, 
> inlined null checks|
> h3. References
>  * ABRiS (Avro Bridge for Spark): [https://github.com/AbsaOSS/ABRiS]
>  * Similar concepts in other engines:
>  ** BigQuery: struct comparison semantics handle NULL fields natively
>  ** DuckDB: struct null-awareness in predicates



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to