[ 
https://issues.apache.org/jira/browse/SPARK-37621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Krogen updated SPARK-37621:
--------------------------------
    Description: 
I am gettin an error when I try to persist the results on a Join operation. 
Note that both tables to be joined and the output table are Iceberg tables.

SQL code to repro. 
{code}
String sqlJoin = String.format(
        "SELECT * from " +
                "((select %s from %s.%s where %s ) lllll " +
                "join (select %s from %s.%s where %s ) rrrrr " +
                "using (%s))",
        ........);
spark.sql(sqlJoin).writeTo("ciptest.ttt").option("write-format", 
"parquet").createOrReplace();
{code}

My exception stack is:
{code}
Caused by: java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to 
org.apache.spark.sql.catalyst.expressions.UnsafeRow
        at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:64)
        at 
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at ….
{code}

Explain on the Sql statement gets the following plan:
{code}
== Physical Plan ==
Project [ ... ]
+- SortMergeJoin […], Inner
  :- Sort […], false, 0
  : +- Exchange hashpartitioning(…, 10), ENSURE_REQUIREMENTS, [id=#38]
  :   +- Filter (…)
  :    +- BatchScan[... ] left [filters=…]
  +- *(2) Sort […], false, 0
   +- Exchange hashpartitioning(…, 10), ENSURE_REQUIREMENTS, [id=#47]
     +- *(1) Filter (…)
      +- BatchScan[…] right [filters=…] 
{code}

Note that several variations of this fail. Besides the repro code listed above 
I have tried doing CTAS and trying to write the result into parquet files 
without making a table out of it.

  was:
I am gettin an error when I try to persist the results on a Join operation. 
Note that both tables to be joined and the output table are Iceberg tables.

SQL code to repro. 


String sqlJoin = String.format(
        "SELECT * from " +
                "((select %s from %s.%s where %s ) lllll " +
                "join (select %s from %s.%s where %s ) rrrrr " +
                "using (%s))",
        ........);
spark.sql(sqlJoin).writeTo("ciptest.ttt").option("write-format", 
"parquet").createOrReplace();

My exception stack is:
{{Caused by: java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to 
org.apache.spark.sql.catalyst.expressions.UnsafeRow}}
{{      at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:64)}}
{{      at 
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)}}
{{      at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)}}
{{      at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)}}
{{      at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)}}
{{      at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)}}
{{      at org.apache.spark.scheduler.Task.run(Task.scala:131)}}
{{      at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)}}
{{      at ….}}

Explain on the Sql statement gets the following plan:
{{== Physical Plan ==}}
{{Project [ ... ]}}
{{+- SortMergeJoin […], Inner}}
{{  :- Sort […], false, 0}}
{{  : +- Exchange hashpartitioning(…, 10), ENSURE_REQUIREMENTS, [id=#38]}}
{{  :   +- Filter (…)}}
{{  :    +- BatchScan[... ] left [filters=…]}}
{{  +- *(2) Sort […], false, 0}}
{{   +- Exchange hashpartitioning(…, 10), ENSURE_REQUIREMENTS, [id=#47]}}
{{     +- *(1) Filter (…)}}
{{      +- BatchScan[…] right [filters=…] }}


{{Note that several variations of this fail. Besides the repro code listed 
above I have tried doing CTAS and trying to write the result into parquet files 
without making a table out of it.}}


> ClassCastException when trying to persist the result of a join between two 
> Iceberg tables
> -----------------------------------------------------------------------------------------
>
>                 Key: SPARK-37621
>                 URL: https://issues.apache.org/jira/browse/SPARK-37621
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output
>    Affects Versions: 3.1.2
>            Reporter: Ciprian Gerea
>            Priority: Major
>
> I am gettin an error when I try to persist the results on a Join operation. 
> Note that both tables to be joined and the output table are Iceberg tables.
> SQL code to repro. 
> {code}
> String sqlJoin = String.format(
>         "SELECT * from " +
>                 "((select %s from %s.%s where %s ) lllll " +
>                 "join (select %s from %s.%s where %s ) rrrrr " +
>                 "using (%s))",
>         ........);
> spark.sql(sqlJoin).writeTo("ciptest.ttt").option("write-format", 
> "parquet").createOrReplace();
> {code}
> My exception stack is:
> {code}
> Caused by: java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast 
> to org.apache.spark.sql.catalyst.expressions.UnsafeRow
>       at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:64)
>       at 
> org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)
>       at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
>       at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
>       at org.apache.spark.scheduler.Task.run(Task.scala:131)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
>       at ….
> {code}
> Explain on the Sql statement gets the following plan:
> {code}
> == Physical Plan ==
> Project [ ... ]
> +- SortMergeJoin […], Inner
>   :- Sort […], false, 0
>   : +- Exchange hashpartitioning(…, 10), ENSURE_REQUIREMENTS, [id=#38]
>   :   +- Filter (…)
>   :    +- BatchScan[... ] left [filters=…]
>   +- *(2) Sort […], false, 0
>    +- Exchange hashpartitioning(…, 10), ENSURE_REQUIREMENTS, [id=#47]
>      +- *(1) Filter (…)
>       +- BatchScan[…] right [filters=…] 
> {code}
> Note that several variations of this fail. Besides the repro code listed 
> above I have tried doing CTAS and trying to write the result into parquet 
> files without making a table out of it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to