[ 
https://issues.apache.org/jira/browse/SPARK-42937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Robbins updated SPARK-42937:
----------------------------------
    Affects Version/s: 3.3.2

> Join with subquery in condition can fail with wholestage codegen and adaptive 
> execution disabled
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-42937
>                 URL: https://issues.apache.org/jira/browse/SPARK-42937
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.3.2, 3.5.0
>            Reporter: Bruce Robbins
>            Priority: Major
>
> The below left outer join gets an error:
> {noformat}
> create or replace temp view v1 as
> select * from values
> (1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1),
> (2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2),
> (3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)
> as v1(key, value1, value2, value3, value4, value5, value6, value7, value8, 
> value9, value10);
> create or replace temp view v2 as
> select * from values
> (1, 2),
> (3, 8),
> (7, 9)
> as v2(a, b);
> create or replace temp view v3 as
> select * from values
> (3),
> (8)
> as v3(col1);
> set spark.sql.codegen.maxFields=10; -- let's make maxFields 10 instead of 100
> set spark.sql.adaptive.enabled=false;
> select *
> from v1
> left outer join v2
> on key = a
> and key in (select col1 from v3);
> {noformat}
> The join fails during predicate codegen:
> {noformat}
> 23/03/27 12:24:12 WARN Predicate: Expr codegen error and falling back to 
> interpreter mode
> java.lang.IllegalArgumentException: requirement failed: input[0, int, false] 
> IN subquery#34 has not finished
>       at scala.Predef$.require(Predef.scala:281)
>       at 
> org.apache.spark.sql.execution.InSubqueryExec.prepareResult(subquery.scala:144)
>       at 
> org.apache.spark.sql.execution.InSubqueryExec.doGenCode(subquery.scala:156)
>       at 
> org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:201)
>       at scala.Option.getOrElse(Option.scala:189)
>       at 
> org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:196)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$generateExpressions$2(CodeGenerator.scala:1278)
>       at scala.collection.immutable.List.map(List.scala:293)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1278)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:41)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.generate(GeneratePredicate.scala:33)
>       at 
> org.apache.spark.sql.catalyst.expressions.Predicate$.createCodeGeneratedObject(predicates.scala:73)
>       at 
> org.apache.spark.sql.catalyst.expressions.Predicate$.createCodeGeneratedObject(predicates.scala:70)
>       at 
> org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:51)
>       at 
> org.apache.spark.sql.catalyst.expressions.Predicate$.create(predicates.scala:86)
>       at 
> org.apache.spark.sql.execution.joins.HashJoin.boundCondition(HashJoin.scala:146)
>       at 
> org.apache.spark.sql.execution.joins.HashJoin.boundCondition$(HashJoin.scala:140)
>       at 
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.boundCondition$lzycompute(BroadcastHashJoinExec.scala:40)
>       at 
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.boundCondition(BroadcastHashJoinExec.scala:40)
> {noformat}
> It fails again after fallback to interpreter mode:
> {noformat}
> 23/03/27 12:24:12 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 7)
> java.lang.IllegalArgumentException: requirement failed: input[0, int, false] 
> IN subquery#34 has not finished
>       at scala.Predef$.require(Predef.scala:281)
>       at 
> org.apache.spark.sql.execution.InSubqueryExec.prepareResult(subquery.scala:144)
>       at 
> org.apache.spark.sql.execution.InSubqueryExec.eval(subquery.scala:151)
>       at 
> org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:52)
>       at 
> org.apache.spark.sql.execution.joins.HashJoin.$anonfun$boundCondition$2(HashJoin.scala:146)
>       at 
> org.apache.spark.sql.execution.joins.HashJoin.$anonfun$boundCondition$2$adapted(HashJoin.scala:146)
>       at 
> org.apache.spark.sql.execution.joins.HashJoin.$anonfun$outerJoin$1(HashJoin.scala:205)
> {noformat}
> Both the predicate codegen and the evaluation fail for the same reason: 
> {{PlanSubqueries}} creates {{InSubqueryExec}} with {{shouldBroadcast=false}}. 
> The driver waits for the subquery to finish, but it's the executor that uses 
> the results of the subquery (for predicate codegen or evaluation). Because 
> {{shouldBroadcast}} is set to false, the result is stored in a transient 
> field ({{InSubqueryExec#result}}), so the result of the subquery is not 
> serialized when the {{InSubqueryExec}} instance is sent to the executor.
> When wholestage codegen is enabled, the predicate codegen happens on the 
> driver, so the subquery's result is available. When adaptive execution is 
> enabled, {{PlanAdaptiveSubqueries}} always sets {{shouldBroadcast=true}}, so 
> the subquery's result is available on the executor, if needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to