[
https://issues.apache.org/jira/browse/SPARK-37621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Erik Krogen updated SPARK-37621:
--------------------------------
Description:
I am gettin an error when I try to persist the results on a Join operation.
Note that both tables to be joined and the output table are Iceberg tables.
SQL code to repro.
{code}
String sqlJoin = String.format(
"SELECT * from " +
"((select %s from %s.%s where %s ) lllll " +
"join (select %s from %s.%s where %s ) rrrrr " +
"using (%s))",
........);
spark.sql(sqlJoin).writeTo("ciptest.ttt").option("write-format",
"parquet").createOrReplace();
{code}
My exception stack is:
{code}
Caused by: java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to
org.apache.spark.sql.catalyst.expressions.UnsafeRow
at
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:64)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at ….
{code}
Explain on the Sql statement gets the following plan:
{code}
== Physical Plan ==
Project [ ... ]
+- SortMergeJoin […], Inner
:- Sort […], false, 0
: +- Exchange hashpartitioning(…, 10), ENSURE_REQUIREMENTS, [id=#38]
: +- Filter (…)
: +- BatchScan[... ] left [filters=…]
+- *(2) Sort […], false, 0
+- Exchange hashpartitioning(…, 10), ENSURE_REQUIREMENTS, [id=#47]
+- *(1) Filter (…)
+- BatchScan[…] right [filters=…]
{code}
Note that several variations of this fail. Besides the repro code listed above
I have tried doing CTAS and trying to write the result into parquet files
without making a table out of it.
was:
I am gettin an error when I try to persist the results on a Join operation.
Note that both tables to be joined and the output table are Iceberg tables.
SQL code to repro.
String sqlJoin = String.format(
"SELECT * from " +
"((select %s from %s.%s where %s ) lllll " +
"join (select %s from %s.%s where %s ) rrrrr " +
"using (%s))",
........);
spark.sql(sqlJoin).writeTo("ciptest.ttt").option("write-format",
"parquet").createOrReplace();
My exception stack is:
{{Caused by: java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to
org.apache.spark.sql.catalyst.expressions.UnsafeRow}}
{{ at
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:64)}}
{{ at
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)}}
{{ at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)}}
{{ at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)}}
{{ at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)}}
{{ at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)}}
{{ at org.apache.spark.scheduler.Task.run(Task.scala:131)}}
{{ at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)}}
{{ at ….}}
Explain on the Sql statement gets the following plan:
{{== Physical Plan ==}}
{{Project [ ... ]}}
{{+- SortMergeJoin […], Inner}}
{{ :- Sort […], false, 0}}
{{ : +- Exchange hashpartitioning(…, 10), ENSURE_REQUIREMENTS, [id=#38]}}
{{ : +- Filter (…)}}
{{ : +- BatchScan[... ] left [filters=…]}}
{{ +- *(2) Sort […], false, 0}}
{{ +- Exchange hashpartitioning(…, 10), ENSURE_REQUIREMENTS, [id=#47]}}
{{ +- *(1) Filter (…)}}
{{ +- BatchScan[…] right [filters=…] }}
{{Note that several variations of this fail. Besides the repro code listed
above I have tried doing CTAS and trying to write the result into parquet files
without making a table out of it.}}
> ClassCastException when trying to persist the result of a join between two
> Iceberg tables
> -----------------------------------------------------------------------------------------
>
> Key: SPARK-37621
> URL: https://issues.apache.org/jira/browse/SPARK-37621
> Project: Spark
> Issue Type: Bug
> Components: Input/Output
> Affects Versions: 3.1.2
> Reporter: Ciprian Gerea
> Priority: Major
>
> I am gettin an error when I try to persist the results on a Join operation.
> Note that both tables to be joined and the output table are Iceberg tables.
> SQL code to repro.
> {code}
> String sqlJoin = String.format(
> "SELECT * from " +
> "((select %s from %s.%s where %s ) lllll " +
> "join (select %s from %s.%s where %s ) rrrrr " +
> "using (%s))",
> ........);
> spark.sql(sqlJoin).writeTo("ciptest.ttt").option("write-format",
> "parquet").createOrReplace();
> {code}
> My exception stack is:
> {code}
> Caused by: java.lang.ClassCastException:
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast
> to org.apache.spark.sql.catalyst.expressions.UnsafeRow
> at
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:64)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
> at ….
> {code}
> Explain on the Sql statement gets the following plan:
> {code}
> == Physical Plan ==
> Project [ ... ]
> +- SortMergeJoin […], Inner
> :- Sort […], false, 0
> : +- Exchange hashpartitioning(…, 10), ENSURE_REQUIREMENTS, [id=#38]
> : +- Filter (…)
> : +- BatchScan[... ] left [filters=…]
> +- *(2) Sort […], false, 0
> +- Exchange hashpartitioning(…, 10), ENSURE_REQUIREMENTS, [id=#47]
> +- *(1) Filter (…)
> +- BatchScan[…] right [filters=…]
> {code}
> Note that several variations of this fail. Besides the repro code listed
> above I have tried doing CTAS and trying to write the result into parquet
> files without making a table out of it.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]