[
https://issues.apache.org/jira/browse/SPARK-37784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Josh Rosen resolved SPARK-37784.
--------------------------------
Fix Version/s: 3.3.0
3.0.4
3.2.1
3.1.3
Resolution: Fixed
Issue resolved by pull request 35066
[https://github.com/apache/spark/pull/35066]
> CodeGenerator.addBufferedState() does not properly handle UDTs
> --------------------------------------------------------------
>
> Key: SPARK-37784
> URL: https://issues.apache.org/jira/browse/SPARK-37784
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Josh Rosen
> Assignee: Josh Rosen
> Priority: Major
> Labels: correctness
> Fix For: 3.3.0, 3.0.4, 3.2.1, 3.1.3
>
>
> The {{CodeGenerator.addBufferedState()}} method does not properly handle
> UDTs: it pattern-matches on a data type to determine whether {{copy()}} or
> {{clone()}} operations need to be performed but the current pattern match
> does not handle UDTs and instead falls through to the default case which
> causes values to be stored without copying. This is problematic if the UDT's
> underlying data type requires copying (i.e. the UDT is internally represented
> using an array, struct, map, or sting type).
> This issue impacts queries which use sort-merge join where UDTs appear as
> part of join keys.
> I discovered this while investigating a query which failed with segfaults. I
> managed to shrink my original query down to the following reproduction (which
> uses Spark's built-in Vector UDT):
> {code:java}
> import org.apache.spark.ml.linalg.Vectors
> val df = spark.createDataFrame(
> Seq(
> (Vectors.dense(1.0), Vectors.dense(1.0)),
> (Vectors.dense(1.0), Vectors.dense(2.0))
> )).toDF("key", "value")
> sql("set spark.sql.adaptive.enabled = false")
> sql("set spark.sql.autoBroadcastJoinThreshold = -1")
> sql("set spark.sql.shuffle.partitions = 1")df.join(df, "key").show()
> df.join(df, "key").explain("codegen")
> df.join(df, "key").show() {code}
> When run with off-heap memory enabled, this failed with a segfault at the
> stack
> {code:java}
> Stack: [0x00007f518a7b5000,0x00007f518abb6000], sp=0x00007f518abb32e0, free
> space=4088k
> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native
> code)
> J 12956 C1 org.apache.spark.unsafe.Platform.getLong(Ljava/lang/Object;J)J (9
> bytes) @ 0x00007f5e5ec2809e [0x00007f5e5ec28060+0x3e]
> j
> org.apache.spark.unsafe.array.ByteArrayMethods.arrayEquals(Ljava/lang/Object;JLjava/lang/Object;JJ)Z+135
> j
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.equals(Ljava/lang/Object;)Z+44
> j
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.smj_compareStruct_0(Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;)I+16
> j
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.smj_findNextInnerJoinRows_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage1;Lscala/collection/Iterator;Lscala/collection/Iterator;)Z+107
> j
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext()V+393
> j org.apache.spark.sql.execution.BufferedRowIterator.hasNext()Z+11
> j
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext()Z+4{code}
> Please note that this particular reproduction does not fail in all
> environments (I can't reproduce it on my laptop, for example, or on certain
> EC2 instance types).
> Here is an annotated excerpt from the generated code which shows the source
> of the problem:
> {code:java}
> /* 223 */ private boolean smj_findNextJoinRows_0(
> /* 224 */ scala.collection.Iterator streamedIter,
> /* 225 */ scala.collection.Iterator bufferedIter) {
> /* 226 */ smj_streamedRow_0 = null;
> /* 227 */ int comp = 0;
> /* 228 */ while (smj_streamedRow_0 == null) {
> /* 229 */ if (!streamedIter.hasNext()) return false;
> /* 230 */ smj_streamedRow_0 = (InternalRow) streamedIter.next();
> /* 231 */ boolean smj_isNull_0 = smj_streamedRow_0.isNullAt(0);
> // smj_value_0 is a value retrieved from a streamed row:
> // |
> // V
> /* 232 */ InternalRow smj_value_0 = smj_isNull_0 ?
> /* 233 */ null : (smj_streamedRow_0.getStruct(0, 4));
> [...]
> // This value is stored in smj_mutableStateArray_0[0] without
> // copying (even though it's a struct, not an atomic type):
> /* 265 */ smj_mutableStateArray_0[0] = smj_value_1;
> /* 266 */ }{code}
> I believe the fix for this bug is fairly simple: we just need to modify
> {{CodeGenerator.addBufferedState()}} so that it uses UDTs' underlying sqlType
> when determining whether value copying is needed.
> I've labeled this as a correctness issue because a "missing copying" bug can
> theoretically lead to wrong query results, not just crashes, although I
> haven't been able to contrive a test case demonstrating a wrong result due to
> this bug.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]