[ 
https://issues.apache.org/jira/browse/SPARK-37784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen resolved SPARK-37784.
--------------------------------
    Fix Version/s: 3.3.0
                   3.0.4
                   3.2.1
                   3.1.3
       Resolution: Fixed

Issue resolved by pull request 35066
[https://github.com/apache/spark/pull/35066]

> CodeGenerator.addBufferedState() does not properly handle UDTs
> --------------------------------------------------------------
>
>                 Key: SPARK-37784
>                 URL: https://issues.apache.org/jira/browse/SPARK-37784
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Josh Rosen
>            Assignee: Josh Rosen
>            Priority: Major
>              Labels: correctness
>             Fix For: 3.3.0, 3.0.4, 3.2.1, 3.1.3
>
>
> The {{CodeGenerator.addBufferedState()}} method does not properly handle 
> UDTs: it pattern-matches on a data type to determine whether {{copy()}} or 
> {{clone()}} operations need to be performed but the current pattern match 
> does not handle UDTs and instead falls through to the default case which 
> causes values to be stored without copying. This is problematic if the UDT's 
> underlying data type requires copying (i.e. the UDT is internally represented 
> using an array, struct, map, or sting type).
> This issue impacts queries which use sort-merge join where UDTs appear as 
> part of join keys.
> I discovered this while investigating a query which failed with segfaults. I 
> managed to shrink my original query down to the following reproduction (which 
> uses Spark's built-in Vector UDT):
> {code:java}
> import org.apache.spark.ml.linalg.Vectors
> val df = spark.createDataFrame(
>   Seq(
>     (Vectors.dense(1.0), Vectors.dense(1.0)),
>     (Vectors.dense(1.0), Vectors.dense(2.0))
>   )).toDF("key", "value")
> sql("set spark.sql.adaptive.enabled = false")
> sql("set spark.sql.autoBroadcastJoinThreshold = -1")
> sql("set spark.sql.shuffle.partitions = 1")df.join(df, "key").show()
> df.join(df, "key").explain("codegen")
> df.join(df, "key").show() {code}
> When run with off-heap memory enabled, this failed with a segfault at the 
> stack
> {code:java}
> Stack: [0x00007f518a7b5000,0x00007f518abb6000],  sp=0x00007f518abb32e0,  free 
> space=4088k
> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native 
> code)
> J 12956 C1 org.apache.spark.unsafe.Platform.getLong(Ljava/lang/Object;J)J (9 
> bytes) @ 0x00007f5e5ec2809e [0x00007f5e5ec28060+0x3e]
> j  
> org.apache.spark.unsafe.array.ByteArrayMethods.arrayEquals(Ljava/lang/Object;JLjava/lang/Object;JJ)Z+135
> j  
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.equals(Ljava/lang/Object;)Z+44
> j  
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.smj_compareStruct_0(Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;)I+16
> j  
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.smj_findNextInnerJoinRows_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage1;Lscala/collection/Iterator;Lscala/collection/Iterator;)Z+107
> j  
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext()V+393
> j  org.apache.spark.sql.execution.BufferedRowIterator.hasNext()Z+11
> j  
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext()Z+4{code}
> Please note that this particular reproduction does not fail in all 
> environments (I can't reproduce it on my laptop, for example, or on certain 
> EC2 instance types).
> Here is an annotated excerpt from the generated code which shows the source 
> of the problem:
> {code:java}
> /* 223 */   private boolean smj_findNextJoinRows_0(
> /* 224 */     scala.collection.Iterator streamedIter,
> /* 225 */     scala.collection.Iterator bufferedIter) {
> /* 226 */     smj_streamedRow_0 = null;
> /* 227 */     int comp = 0;
> /* 228 */     while (smj_streamedRow_0 == null) {
> /* 229 */       if (!streamedIter.hasNext()) return false;
> /* 230 */       smj_streamedRow_0 = (InternalRow) streamedIter.next();
> /* 231 */       boolean smj_isNull_0 = smj_streamedRow_0.isNullAt(0);
> // smj_value_0 is a value retrieved from a streamed row:
> //                              |
> //                              V
> /* 232 */       InternalRow smj_value_0 = smj_isNull_0 ?
> /* 233 */       null : (smj_streamedRow_0.getStruct(0, 4));
> [...]
> // This value is stored in smj_mutableStateArray_0[0] without
> // copying (even though it's a struct, not an atomic type):
> /* 265 */           smj_mutableStateArray_0[0] = smj_value_1;
> /* 266 */         }{code}
> I believe the fix for this bug is fairly simple: we just need to modify 
> {{CodeGenerator.addBufferedState()}} so that it uses UDTs' underlying sqlType 
> when determining whether value copying is needed. 
> I've labeled this as a correctness issue because a "missing copying" bug can 
> theoretically lead to wrong query results, not just crashes, although I 
> haven't been able to contrive a test case demonstrating a wrong result due to 
> this bug.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to