[
https://issues.apache.org/jira/browse/SPARK-55787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Joaquín Antonio De Vicente López updated SPARK-55787:
-----------------------------------------------------
Description:
h3. Problem
Spark currently lacks a built-in way to determine whether a struct contains
only NULL values, without explicitly checking every field or performing
expensive serialization.
In Structured Streaming pipelines using Kafka + Avro (via ABRiS), when
deserialization fails and {{PermissiveRecordExceptionHandler}} is used, the
handler produces a struct that is *non-null but has ALL
fields set to null*:
{code:scala}
class PermissiveRecordExceptionHandler() extends
DeserializationExceptionHandler with Logging {
def handle(exception: Throwable, deserializer: AbrisAvroDeserializer,
readerSchema: Schema): Any = {
logWarning("Malformed record detected. Replacing with full null row.",
exception)
val record = new GenericData.Record(readerSchema)
deserializer.deserialize(record)
}
} {code}
Currently, the only way to detect these "empty" structs is:
{code:scala}
df.filter(col("value").isNotNull && to_json(col("value")) =!= "{}"){code}
This is problematic because:
* {{to_json}} forces *full JSON serialization* of every row (Jackson
allocation + byte array + UTF8String)
* It is *not type-aware* — it compares against a magic string {{"{}"}}
* It creates significant GC pressure in high-throughput streaming (100K+
msg/sec)
* It breaks whole-stage codegen ({{{}StructsToJson{}}} uses
{{RuntimeReplaceable}} → falls back to {{{}Invoke{}}})
In streaming ingestion pipelines, if these all-null structs are not filtered
out, they may propagate downstream to sinks such as Apache Kudu. When struct
fields correspond to primary key columns, this can cause runtime failures due
to NULL values in primary keys, aborting the streaming query. Preventing such
rows in an efficient and schema-agnostic way is therefore not only a
performance concern but also a correctness and reliability requirement.
This functionality is currently not expressible in a schema-agnostic and
optimizer-friendly way using existing built-in functions.
h3. Proposed Solution
Add two new built-in SQL functions:
{code:sql}
is_struct_empty(struct) -- TRUE if non-null struct has all null fields
is_struct_non_empty(struct) -- TRUE if non-null struct has at least one
non-null field{code}
*Usage:*
{code:scala}
// New — zero-cost null-bitmap check
df.filter(col("value").isNotNull && is_struct_non_empty(col("value")))
// SQL syntax
SELECT * FROM kafka_messages WHERE value IS NOT NULL AND
is_struct_non_empty(value) {code}
h3. Semantics
||Input State||is_struct_empty||is_struct_non_empty||
|NULL struct|NULL|NULL|
|Non-null, all fields null|TRUE|FALSE|
|Non-null, at least one field non-null|FALSE|TRUE|
|Non-null, zero fields (empty schema)|TRUE|FALSE|
* Follows SQL three-valued logic: NULL input → NULL output
* Shallow check only: a nested struct that is itself non-null (even if its
children are all null) counts as non-null at the parent level
* Non-null empty arrays/maps inside the struct count as non-null (they are
values, not null)
h3. Performance Comparison
||Dimension||to_json =!= "{}"||is_struct_non_empty||
|Object allocation|JacksonGenerator, StringWriter, JsonGenerator, UTF8String,
byte[] per row|*Zero*|
|Serialization|Full JSON of ALL fields|*None* — bitwise null-check only|
|Short-circuit|No — must serialize entire struct|Yes — stops at first non-null
field|
|CPU cost|O(fields × type_dispatch + string_compare)|O(1) best case, O(fields)
worst case|
|GC pressure|High|*None*|
|Codegen|Falls back to Invoke (no tight codegen)|Full whole-stage codegen,
inlined null checks|
h3. References
* ABRiS (Avro Bridge for Spark): [https://github.com/AbsaOSS/ABRiS]
* Similar concepts in other engines:
** BigQuery: struct comparison semantics handle NULL fields natively
** DuckDB: struct null-awareness in predicates
was:
h3. Problem
Spark currently lacks a built-in way to determine whether a struct contains
only NULL values, without explicitly checking every field or performing
expensive serialization.
In Structured Streaming pipelines using Kafka + Avro (via ABRiS), when
deserialization fails and {{PermissiveRecordExceptionHandler}} is used, the
handler produces a struct that is *non-null but has ALL
fields set to null*:
{code:scala}
class PermissiveRecordExceptionHandler() extends
DeserializationExceptionHandler with Logging {
def handle(exception: Throwable, deserializer: AbrisAvroDeserializer,
readerSchema: Schema): Any = {
logWarning("Malformed record detected. Replacing with full null row.",
exception)
val record = new GenericData.Record(readerSchema)
deserializer.deserialize(record)
}
} {code}
Currently, the only way to detect these "empty" structs is:
{code:scala}
df.filter(col("value").isNotNull && to_json(col("value")) =!= "{}"){code}
This is problematic because:
* {{to_json}} forces *full JSON serialization* of every row (Jackson
allocation + byte array + UTF8String)
* It is *not type-aware* — it compares against a magic string {{"{}"}}
* It creates significant GC pressure in high-throughput streaming (100K+
msg/sec)
* It breaks whole-stage codegen ({{{}StructsToJson{}}} uses
{{RuntimeReplaceable}} → falls back to {{{}Invoke{}}})
h3. Proposed Solution
Add two new built-in SQL functions:
{code:sql}
is_struct_empty(struct) -- TRUE if non-null struct has all null fields
is_struct_non_empty(struct) -- TRUE if non-null struct has at least one
non-null field{code}
*Usage:*
{code:scala}
// New — zero-cost null-bitmap check
df.filter(col("value").isNotNull && is_struct_non_empty(col("value")))
// SQL syntax
SELECT * FROM kafka_messages WHERE value IS NOT NULL AND
is_struct_non_empty(value) {code}
h3. Semantics
||Input State||is_struct_empty||is_struct_non_empty||
|NULL struct|NULL|NULL|
|Non-null, all fields null|TRUE|FALSE|
|Non-null, at least one field non-null|FALSE|TRUE|
|Non-null, zero fields (empty schema)|TRUE|FALSE|
* Follows SQL three-valued logic: NULL input → NULL output
* Shallow check only: a nested struct that is itself non-null (even if its
children are all null) counts as non-null at the parent level
* Non-null empty arrays/maps inside the struct count as non-null (they are
values, not null)
h3. Performance Comparison
||Dimension||to_json =!= "{}"||is_struct_non_empty||
|Object allocation|JacksonGenerator, StringWriter, JsonGenerator, UTF8String,
byte[] per row|*Zero*|
|Serialization|Full JSON of ALL fields|*None* — bitwise null-check only|
|Short-circuit|No — must serialize entire struct|Yes — stops at first non-null
field|
|CPU cost|O(fields × type_dispatch + string_compare)|O(1) best case, O(fields)
worst case|
|GC pressure|High|*None*|
|Codegen|Falls back to Invoke (no tight codegen)|Full whole-stage codegen,
inlined null checks|
h3. References
* ABRiS (Avro Bridge for Spark): [https://github.com/AbsaOSS/ABRiS]
* Similar concepts in other engines:
** BigQuery: struct comparison semantics handle NULL fields natively
** DuckDB: struct null-awareness in predicates
> Add built-in function to detect all-null structs
> ------------------------------------------------
>
> Key: SPARK-55787
> URL: https://issues.apache.org/jira/browse/SPARK-55787
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 4.1.1
> Reporter: Joaquín Antonio De Vicente López
> Priority: Major
>
> h3. Problem
> Spark currently lacks a built-in way to determine whether a struct contains
> only NULL values, without explicitly checking every field or performing
> expensive serialization.
> In Structured Streaming pipelines using Kafka + Avro (via ABRiS), when
> deserialization fails and {{PermissiveRecordExceptionHandler}} is used, the
> handler produces a struct that is *non-null but has ALL
> fields set to null*:
> {code:scala}
> class PermissiveRecordExceptionHandler() extends
> DeserializationExceptionHandler with Logging {
> def handle(exception: Throwable, deserializer: AbrisAvroDeserializer,
> readerSchema: Schema): Any = {
> logWarning("Malformed record detected. Replacing with full null row.",
> exception)
> val record = new GenericData.Record(readerSchema)
> deserializer.deserialize(record)
> }
> } {code}
> Currently, the only way to detect these "empty" structs is:
> {code:scala}
> df.filter(col("value").isNotNull && to_json(col("value")) =!= "{}"){code}
> This is problematic because:
> * {{to_json}} forces *full JSON serialization* of every row (Jackson
> allocation + byte array + UTF8String)
> * It is *not type-aware* — it compares against a magic string {{"{}"}}
> * It creates significant GC pressure in high-throughput streaming (100K+
> msg/sec)
> * It breaks whole-stage codegen ({{{}StructsToJson{}}} uses
> {{RuntimeReplaceable}} → falls back to {{{}Invoke{}}})
> In streaming ingestion pipelines, if these all-null structs are not filtered
> out, they may propagate downstream to sinks such as Apache Kudu. When struct
> fields correspond to primary key columns, this can cause runtime failures due
> to NULL values in primary keys, aborting the streaming query. Preventing such
> rows in an efficient and schema-agnostic way is therefore not only a
> performance concern but also a correctness and reliability requirement.
> This functionality is currently not expressible in a schema-agnostic and
> optimizer-friendly way using existing built-in functions.
> h3. Proposed Solution
> Add two new built-in SQL functions:
> {code:sql}
> is_struct_empty(struct) -- TRUE if non-null struct has all null fields
> is_struct_non_empty(struct) -- TRUE if non-null struct has at least one
> non-null field{code}
> *Usage:*
> {code:scala}
> // New — zero-cost null-bitmap check
> df.filter(col("value").isNotNull && is_struct_non_empty(col("value")))
> // SQL syntax
> SELECT * FROM kafka_messages WHERE value IS NOT NULL AND
> is_struct_non_empty(value) {code}
> h3. Semantics
> ||Input State||is_struct_empty||is_struct_non_empty||
> |NULL struct|NULL|NULL|
> |Non-null, all fields null|TRUE|FALSE|
> |Non-null, at least one field non-null|FALSE|TRUE|
> |Non-null, zero fields (empty schema)|TRUE|FALSE|
> * Follows SQL three-valued logic: NULL input → NULL output
> * Shallow check only: a nested struct that is itself non-null (even if its
> children are all null) counts as non-null at the parent level
> * Non-null empty arrays/maps inside the struct count as non-null (they are
> values, not null)
> h3. Performance Comparison
> ||Dimension||to_json =!= "{}"||is_struct_non_empty||
> |Object allocation|JacksonGenerator, StringWriter, JsonGenerator, UTF8String,
> byte[] per row|*Zero*|
> |Serialization|Full JSON of ALL fields|*None* — bitwise null-check only|
> |Short-circuit|No — must serialize entire struct|Yes — stops at first
> non-null field|
> |CPU cost|O(fields × type_dispatch + string_compare)|O(1) best case,
> O(fields) worst case|
> |GC pressure|High|*None*|
> |Codegen|Falls back to Invoke (no tight codegen)|Full whole-stage codegen,
> inlined null checks|
> h3. References
> * ABRiS (Avro Bridge for Spark): [https://github.com/AbsaOSS/ABRiS]
> * Similar concepts in other engines:
> ** BigQuery: struct comparison semantics handle NULL fields natively
> ** DuckDB: struct null-awareness in predicates
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]