[ 
https://issues.apache.org/jira/browse/SPARK-37784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-37784:
-------------------------------
    Description: 
The {{CodeGenerator.addBufferedState()}} method does not properly handle UDTs: 
it pattern-matches on a data type to determine whether {{copy()}} or 
{{clone()}} operations need to be performed but the current pattern match does 
not handle UDTs and instead falls through to the default case which causes 
values to be stored without copying. This is problematic if the UDT's 
underlying data type requires copying (i.e. the UDT is internally represented 
using an array, struct, map, or sting type).

This issue impacts queries which use sort-merge join where UDTs appear as part 
of join keys.

I discovered this while investigating a query which failed with segfaults. I 
managed to shrink my original query down to the following reproduction (which 
uses Spark's built-in Vector UDT):
{code:java}
import org.apache.spark.ml.linalg.Vectors

val df = spark.createDataFrame(
  Seq(
    (Vectors.dense(1.0), Vectors.dense(1.0)),
    (Vectors.dense(1.0), Vectors.dense(2.0))
  )).toDF("key", "value")

sql("set spark.sql.adaptive.enabled = false")
sql("set spark.sql.autoBroadcastJoinThreshold = -1")
sql("set spark.sql.shuffle.partitions = 1")df.join(df, "key").show()

df.join(df, "key").explain("codegen")
df.join(df, "key").show() {code}
When run with off-heap memory enabled, this failed with a segfault at the stack
{code:java}
Stack: [0x00007f518a7b5000,0x00007f518abb6000],  sp=0x00007f518abb32e0,  free 
space=4088k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
J 12956 C1 org.apache.spark.unsafe.Platform.getLong(Ljava/lang/Object;J)J (9 
bytes) @ 0x00007f5e5ec2809e [0x00007f5e5ec28060+0x3e]
j  
org.apache.spark.unsafe.array.ByteArrayMethods.arrayEquals(Ljava/lang/Object;JLjava/lang/Object;JJ)Z+135
j  
org.apache.spark.sql.catalyst.expressions.UnsafeRow.equals(Ljava/lang/Object;)Z+44
j  
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.smj_compareStruct_0(Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;)I+16
j  
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.smj_findNextInnerJoinRows_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage1;Lscala/collection/Iterator;Lscala/collection/Iterator;)Z+107
j  
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext()V+393
j  org.apache.spark.sql.execution.BufferedRowIterator.hasNext()Z+11
j  
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext()Z+4{code}
Please note that this particular reproduction does not fail in all environments 
(I can't reproduce it on my laptop, for example, or on certain EC2 instance 
types).

Here is an annotated excerpt from the generated code which shows the source of 
the problem:
{code:java}
/* 223 */   private boolean smj_findNextJoinRows_0(
/* 224 */     scala.collection.Iterator streamedIter,
/* 225 */     scala.collection.Iterator bufferedIter) {
/* 226 */     smj_streamedRow_0 = null;
/* 227 */     int comp = 0;
/* 228 */     while (smj_streamedRow_0 == null) {
/* 229 */       if (!streamedIter.hasNext()) return false;
/* 230 */       smj_streamedRow_0 = (InternalRow) streamedIter.next();
/* 231 */       boolean smj_isNull_0 = smj_streamedRow_0.isNullAt(0);

// smj_value_0 is a value retrieved from a streamed row:
//                              |
//                              V
/* 232 */       InternalRow smj_value_0 = smj_isNull_0 ?
/* 233 */       null : (smj_streamedRow_0.getStruct(0, 4));
[...]

// This value is stored in smj_mutableStateArray_0[0] without
// copying (even though it's a struct, not an atomic type):

/* 265 */           smj_mutableStateArray_0[0] = smj_value_1;
/* 266 */         }{code}
I believe the fix for this bug is fairly simple: we just need to modify 
{{CodeGenerator.addBufferedState()}} so that it uses UDTs' underlying sqlType 
when determining whether value copying is needed. 

I've labeled this as a correctness issue because a "missing copying" bug can 
theoretically lead to wrong query results, not just crashes, although I haven't 
been able to contrive a test case demonstrating a wrong result due to this bug.

  was:
The {{CodeGenerator.addBufferedState()}} method does not properly handle UDTs: 
it pattern-matches on a data type to determine whether {{copy()}} or 
{{clone()}} operations need to be performed but the current pattern match does 
not handle UDTs: these fall through to the default case which causes values to 
be assigned without copying. This is problematic if the UDT's underlying data 
type requires copying (i.e. the UDT is internally represented using an array, 
struct, map, or sting type).

I discovered this while investigating a query which failed with segfaults. I 
managed to shrink my original query down to the following reproduction (which 
uses Spark's built-in Vector UDT):
{code:java}
import org.apache.spark.ml.linalg.Vectors

val df = spark.createDataFrame(
  Seq(
    (Vectors.dense(1.0), Vectors.dense(1.0)),
    (Vectors.dense(1.0), Vectors.dense(2.0))
  )).toDF("key", "value")

sql("set spark.sql.adaptive.enabled = false")
sql("set spark.sql.autoBroadcastJoinThreshold = -1")
sql("set spark.sql.shuffle.partitions = 1")df.join(df, "key").show()

df.join(df, "key").explain("codegen")
df.join(df, "key").show() {code}
When run with off-heap memory enabled, this failed with a segfault at the stack
{code:java}
Stack: [0x00007f518a7b5000,0x00007f518abb6000],  sp=0x00007f518abb32e0,  free 
space=4088k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
J 12956 C1 org.apache.spark.unsafe.Platform.getLong(Ljava/lang/Object;J)J (9 
bytes) @ 0x00007f5e5ec2809e [0x00007f5e5ec28060+0x3e]
j  
org.apache.spark.unsafe.array.ByteArrayMethods.arrayEquals(Ljava/lang/Object;JLjava/lang/Object;JJ)Z+135
j  
org.apache.spark.sql.catalyst.expressions.UnsafeRow.equals(Ljava/lang/Object;)Z+44
j  
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.smj_compareStruct_0(Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;)I+16
j  
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.smj_findNextInnerJoinRows_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage1;Lscala/collection/Iterator;Lscala/collection/Iterator;)Z+107
j  
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext()V+393
j  org.apache.spark.sql.execution.BufferedRowIterator.hasNext()Z+11
j  
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext()Z+4{code}
Please note that this particular reproduction does not fail in all environments 
(I can't reproduce it on my laptop, for example, or on certain EC2 instance 
types).

Here is an annotated excerpt from the generated code which shows the source of 
the problem:
{code:java}
/* 223 */   private boolean smj_findNextJoinRows_0(
/* 224 */     scala.collection.Iterator streamedIter,
/* 225 */     scala.collection.Iterator bufferedIter) {
/* 226 */     smj_streamedRow_0 = null;
/* 227 */     int comp = 0;
/* 228 */     while (smj_streamedRow_0 == null) {
/* 229 */       if (!streamedIter.hasNext()) return false;
/* 230 */       smj_streamedRow_0 = (InternalRow) streamedIter.next();
/* 231 */       boolean smj_isNull_0 = smj_streamedRow_0.isNullAt(0);

// smj_value_0 is a value retrieved from a streamed row:
//                              |
//                              V
/* 232 */       InternalRow smj_value_0 = smj_isNull_0 ?
/* 233 */       null : (smj_streamedRow_0.getStruct(0, 4));
[...]

// This value is stored in smj_mutableStateArray_0[0] without
// copying (even though it's a struct, not an atomic type):

/* 265 */           smj_mutableStateArray_0[0] = smj_value_1;
/* 266 */         }{code}
I believe the fix for this bug is fairly simple: we just need to modify 
{{CodeGenerator.addBufferedState()}} so that it uses UDTs' underlying sqlType 
when determining whether value copying is needed. 

I've labeled this as a correctness issue because a "missing copying" bug can 
theoretically lead to wrong query results, not just crashes, although I haven't 
been able to contrive a test case demonstrating a wrong result due to this bug.


> CodeGenerator.addBufferedState() does not properly handle UDTs
> --------------------------------------------------------------
>
>                 Key: SPARK-37784
>                 URL: https://issues.apache.org/jira/browse/SPARK-37784
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Josh Rosen
>            Assignee: Josh Rosen
>            Priority: Major
>              Labels: correctness
>
> The {{CodeGenerator.addBufferedState()}} method does not properly handle 
> UDTs: it pattern-matches on a data type to determine whether {{copy()}} or 
> {{clone()}} operations need to be performed but the current pattern match 
> does not handle UDTs and instead falls through to the default case which 
> causes values to be stored without copying. This is problematic if the UDT's 
> underlying data type requires copying (i.e. the UDT is internally represented 
> using an array, struct, map, or sting type).
> This issue impacts queries which use sort-merge join where UDTs appear as 
> part of join keys.
> I discovered this while investigating a query which failed with segfaults. I 
> managed to shrink my original query down to the following reproduction (which 
> uses Spark's built-in Vector UDT):
> {code:java}
> import org.apache.spark.ml.linalg.Vectors
> val df = spark.createDataFrame(
>   Seq(
>     (Vectors.dense(1.0), Vectors.dense(1.0)),
>     (Vectors.dense(1.0), Vectors.dense(2.0))
>   )).toDF("key", "value")
> sql("set spark.sql.adaptive.enabled = false")
> sql("set spark.sql.autoBroadcastJoinThreshold = -1")
> sql("set spark.sql.shuffle.partitions = 1")df.join(df, "key").show()
> df.join(df, "key").explain("codegen")
> df.join(df, "key").show() {code}
> When run with off-heap memory enabled, this failed with a segfault at the 
> stack
> {code:java}
> Stack: [0x00007f518a7b5000,0x00007f518abb6000],  sp=0x00007f518abb32e0,  free 
> space=4088k
> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native 
> code)
> J 12956 C1 org.apache.spark.unsafe.Platform.getLong(Ljava/lang/Object;J)J (9 
> bytes) @ 0x00007f5e5ec2809e [0x00007f5e5ec28060+0x3e]
> j  
> org.apache.spark.unsafe.array.ByteArrayMethods.arrayEquals(Ljava/lang/Object;JLjava/lang/Object;JJ)Z+135
> j  
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.equals(Ljava/lang/Object;)Z+44
> j  
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.smj_compareStruct_0(Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;)I+16
> j  
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.smj_findNextInnerJoinRows_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage1;Lscala/collection/Iterator;Lscala/collection/Iterator;)Z+107
> j  
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext()V+393
> j  org.apache.spark.sql.execution.BufferedRowIterator.hasNext()Z+11
> j  
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext()Z+4{code}
> Please note that this particular reproduction does not fail in all 
> environments (I can't reproduce it on my laptop, for example, or on certain 
> EC2 instance types).
> Here is an annotated excerpt from the generated code which shows the source 
> of the problem:
> {code:java}
> /* 223 */   private boolean smj_findNextJoinRows_0(
> /* 224 */     scala.collection.Iterator streamedIter,
> /* 225 */     scala.collection.Iterator bufferedIter) {
> /* 226 */     smj_streamedRow_0 = null;
> /* 227 */     int comp = 0;
> /* 228 */     while (smj_streamedRow_0 == null) {
> /* 229 */       if (!streamedIter.hasNext()) return false;
> /* 230 */       smj_streamedRow_0 = (InternalRow) streamedIter.next();
> /* 231 */       boolean smj_isNull_0 = smj_streamedRow_0.isNullAt(0);
> // smj_value_0 is a value retrieved from a streamed row:
> //                              |
> //                              V
> /* 232 */       InternalRow smj_value_0 = smj_isNull_0 ?
> /* 233 */       null : (smj_streamedRow_0.getStruct(0, 4));
> [...]
> // This value is stored in smj_mutableStateArray_0[0] without
> // copying (even though it's a struct, not an atomic type):
> /* 265 */           smj_mutableStateArray_0[0] = smj_value_1;
> /* 266 */         }{code}
> I believe the fix for this bug is fairly simple: we just need to modify 
> {{CodeGenerator.addBufferedState()}} so that it uses UDTs' underlying sqlType 
> when determining whether value copying is needed. 
> I've labeled this as a correctness issue because a "missing copying" bug can 
> theoretically lead to wrong query results, not just crashes, although I 
> haven't been able to contrive a test case demonstrating a wrong result due to 
> this bug.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to