[
https://issues.apache.org/jira/browse/SPARK-55787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Joaquín Antonio De Vicente López updated SPARK-55787:
-----------------------------------------------------
Description:
h3. Problem
Spark currently lacks a built-in way to determine whether a struct contains
only NULL values, without explicitly checking every field or performing
expensive serialization.
In Structured Streaming pipelines using Kafka + Avro (via ABRiS), when
deserialization fails and {{PermissiveRecordExceptionHandler}} is used, the
handler produces a struct that is *non-null but has ALL
fields set to null*:
{code:scala}
class PermissiveRecordExceptionHandler() extends
DeserializationExceptionHandler with Logging {
def handle(exception: Throwable, deserializer: AbrisAvroDeserializer,
readerSchema: Schema): Any = {
logWarning("Malformed record detected. Replacing with full null row.",
exception)
val record = new GenericData.Record(readerSchema)
deserializer.deserialize(record)
}
} {code}
Currently, the only way to detect these "empty" structs is:
{code:scala}
df.filter(col("value").isNotNull && to_json(col("value")) =!= "{}"){code}
This is problematic because:
* {{to_json}} forces *full JSON serialization* of every row (Jackson
allocation + byte array + UTF8String)
* It is *not type-aware* — it compares against a magic string {{"{}"}}
* It creates significant GC pressure in high-throughput streaming (100K+
msg/sec)
* It breaks whole-stage codegen ({{{}StructsToJson{}}} uses
{{RuntimeReplaceable}} → falls back to {{{}Invoke{}}})
h3. Proposed Solution
Add two new built-in SQL functions:
{code:sql}
is_struct_empty(struct) -- TRUE if non-null struct has all null fields
is_struct_non_empty(struct) -- TRUE if non-null struct has at least one
non-null field{code}
*Usage:*
{code:scala}
// New — zero-cost null-bitmap check
df.filter(col("value").isNotNull && is_struct_non_empty(col("value")))
// SQL syntax
SELECT * FROM kafka_messages WHERE value IS NOT NULL AND
is_struct_non_empty(value) {code}
h3. Semantics
||Input State||is_struct_empty||is_struct_non_empty||
|NULL struct|NULL|NULL|
|Non-null, all fields null|TRUE|FALSE|
|Non-null, at least one field non-null|FALSE|TRUE|
|Non-null, zero fields (empty schema)|TRUE|FALSE|
* Follows SQL three-valued logic: NULL input → NULL output
* Shallow check only: a nested struct that is itself non-null (even if its
children are all null) counts as non-null at the parent level
* Non-null empty arrays/maps inside the struct count as non-null (they are
values, not null)
h3. Performance Comparison
||Dimension||to_json =!= "{}"||is_struct_non_empty||
|Object allocation|JacksonGenerator, StringWriter, JsonGenerator, UTF8String,
byte[] per row|*Zero*|
|Serialization|Full JSON of ALL fields|*None* — bitwise null-check only|
|Short-circuit|No — must serialize entire struct|Yes — stops at first non-null
field|
|CPU cost|O(fields × type_dispatch + string_compare)|O(1) best case, O(fields)
worst case|
|GC pressure|High|*None*|
|Codegen|Falls back to Invoke (no tight codegen)|Full whole-stage codegen,
inlined null checks|
h3. References
* ABRiS (Avro Bridge for Spark): [https://github.com/AbsaOSS/ABRiS]
* Similar concepts in other engines:
** BigQuery: struct comparison semantics handle NULL fields natively
** DuckDB: struct null-awareness in predicates
was:
h3. Problem
Spark currently lacks a built-in way to determine whether a struct contains
only NULL values, without explicitly checking every field or performing
expensive serialization.
In Structured Streaming pipelines using Kafka + Avro (via ABRiS), when
deserialization fails and {{PermissiveRecordExceptionHandler}} is used, the
handler produces a struct that is *non-null but has ALL
fields set to null*:
{code:scala}
class PermissiveRecordExceptionHandler() extends
DeserializationExceptionHandler with Logging {
def handle(exception: Throwable, deserializer: AbrisAvroDeserializer,
readerSchema: Schema): Any = {
logWarning("Malformed record detected. Replacing with full null row.",
exception)
val record = new GenericData.Record(readerSchema)
deserializer.deserialize(record)
}
}
{code}
Currently, the only way to detect these "empty" structs is:
{code:scala}
df.filter(col("value").isNotNull && to_json(col("value")) =!= "{}"){code}
This is problematic because:
* {{to_json}} forces *full JSON serialization* of every row (Jackson
allocation + byte array + UTF8String)
* It is *not type-aware* — it compares against a magic string {{"{}"}}
* It creates significant GC pressure in high-throughput streaming (100K+
msg/sec)
* It breaks whole-stage codegen ({{{}StructsToJson{}}} uses
{{RuntimeReplaceable}} → falls back to {{{}Invoke{}}})
h3. Proposed Solution
Add two new built-in SQL functions:
{code:sql}
is_struct_empty(struct) -- TRUE if non-null struct has all null fields
is_struct_non_empty(struct) -- TRUE if non-null struct has at least one
non-null field{code}
*Usage:*
{code:scala}
// New — zero-cost null-bitmap check
df.filter(col("value").isNotNull && is_struct_non_empty(col("value")))
// SQL syntax
SELECT * FROM kafka_messages WHERE value IS NOT NULL AND
is_struct_non_empty(value)
{code}
h3. Semantics
||Input State||is_struct_empty||is_struct_non_empty||
|NULL struct|NULL|NULL|
|Non-null, all fields null|TRUE|FALSE|
|Non-null, at least one field non-null|FALSE|TRUE|
|Non-null, zero fields (empty schema)|TRUE|FALSE|
* Follows SQL three-valued logic: NULL input → NULL output
* Shallow check only: a nested struct that is itself non-null (even if its
children are all null) counts as non-null at the parent level
* Non-null empty arrays/maps inside the struct count as non-null (they are
values, not null)
h3. Performance Comparison
||Dimension||to_json =!= "{}"||is_struct_non_empty||
|Object allocation|JacksonGenerator, StringWriter, JsonGenerator, UTF8String,
byte[] per row|*Zero*|
|Serialization|Full JSON of ALL fields|*None* — bitwise null-check only|
|Short-circuit|No — must serialize entire struct|Yes — stops at first non-null
field|
|CPU cost|O(fields × type_dispatch + string_compare)|O(1) best case, O(fields)
worst case|
|GC pressure|High|*None*|
|Codegen|Falls back to Invoke (no tight codegen)|Full whole-stage codegen,
inlined null checks|
h3. References
* ABRiS (Avro Bridge for Spark): [https://github.com/AbsaOSS/ABRiS]
* Stack Overflow discussions on filtering all-null structs:
** [https://stackoverflow.com/questions/tagged/apache-spark+struct+null]
* Similar concepts in other engines:
** BigQuery: struct comparison semantics handle NULL fields natively
** DuckDB: struct null-awareness in predicates
> Add built-in function to detect all-null structs
> ------------------------------------------------
>
> Key: SPARK-55787
> URL: https://issues.apache.org/jira/browse/SPARK-55787
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 4.1.1
> Reporter: Joaquín Antonio De Vicente López
> Priority: Major
>
> h3. Problem
> Spark currently lacks a built-in way to determine whether a struct contains
> only NULL values, without explicitly checking every field or performing
> expensive serialization.
> In Structured Streaming pipelines using Kafka + Avro (via ABRiS), when
> deserialization fails and {{PermissiveRecordExceptionHandler}} is used, the
> handler produces a struct that is *non-null but has ALL
> fields set to null*:
> {code:scala}
> class PermissiveRecordExceptionHandler() extends
> DeserializationExceptionHandler with Logging {
> def handle(exception: Throwable, deserializer: AbrisAvroDeserializer,
> readerSchema: Schema): Any = {
> logWarning("Malformed record detected. Replacing with full null row.",
> exception)
> val record = new GenericData.Record(readerSchema)
> deserializer.deserialize(record)
> }
> } {code}
> Currently, the only way to detect these "empty" structs is:
> {code:scala}
> df.filter(col("value").isNotNull && to_json(col("value")) =!= "{}"){code}
> This is problematic because:
> * {{to_json}} forces *full JSON serialization* of every row (Jackson
> allocation + byte array + UTF8String)
> * It is *not type-aware* — it compares against a magic string {{"{}"}}
> * It creates significant GC pressure in high-throughput streaming (100K+
> msg/sec)
> * It breaks whole-stage codegen ({{{}StructsToJson{}}} uses
> {{RuntimeReplaceable}} → falls back to {{{}Invoke{}}})
> h3. Proposed Solution
> Add two new built-in SQL functions:
> {code:sql}
> is_struct_empty(struct) -- TRUE if non-null struct has all null fields
> is_struct_non_empty(struct) -- TRUE if non-null struct has at least one
> non-null field{code}
> *Usage:*
> {code:scala}
> // New — zero-cost null-bitmap check
> df.filter(col("value").isNotNull && is_struct_non_empty(col("value")))
> // SQL syntax
> SELECT * FROM kafka_messages WHERE value IS NOT NULL AND
> is_struct_non_empty(value) {code}
> h3. Semantics
> ||Input State||is_struct_empty||is_struct_non_empty||
> |NULL struct|NULL|NULL|
> |Non-null, all fields null|TRUE|FALSE|
> |Non-null, at least one field non-null|FALSE|TRUE|
> |Non-null, zero fields (empty schema)|TRUE|FALSE|
> * Follows SQL three-valued logic: NULL input → NULL output
> * Shallow check only: a nested struct that is itself non-null (even if its
> children are all null) counts as non-null at the parent level
> * Non-null empty arrays/maps inside the struct count as non-null (they are
> values, not null)
> h3. Performance Comparison
> ||Dimension||to_json =!= "{}"||is_struct_non_empty||
> |Object allocation|JacksonGenerator, StringWriter, JsonGenerator, UTF8String,
> byte[] per row|*Zero*|
> |Serialization|Full JSON of ALL fields|*None* — bitwise null-check only|
> |Short-circuit|No — must serialize entire struct|Yes — stops at first
> non-null field|
> |CPU cost|O(fields × type_dispatch + string_compare)|O(1) best case,
> O(fields) worst case|
> |GC pressure|High|*None*|
> |Codegen|Falls back to Invoke (no tight codegen)|Full whole-stage codegen,
> inlined null checks|
> h3. References
> * ABRiS (Avro Bridge for Spark): [https://github.com/AbsaOSS/ABRiS]
> * Similar concepts in other engines:
> ** BigQuery: struct comparison semantics handle NULL fields natively
> ** DuckDB: struct null-awareness in predicates
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]