[
https://issues.apache.org/jira/browse/SPARK-36932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17427005#comment-17427005
]
chendihao commented on SPARK-36932:
-----------------------------------
It is not related to join operation. I have a simpler case to re-produce the
issue with single dataframe.
{code:scala}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{IntegerType, LongType, StructField,
StructType}
import org.apache.spark.sql.{Row, SparkSession}
object SchemaPuringIssue {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local")
.getOrCreate()
val data1 = Seq(
Row(1, 1l),
Row(2, 2l))
val schema1 = StructType(List(
StructField("col1", IntegerType),
StructField("col1", LongType)))
val df = spark.createDataFrame(spark.sparkContext.makeRDD(data1), schema1)
df.show()
import spark.implicits._
val distinct = df
.groupByKey {
row => row.getInt(0)
}
.mapGroups {
case (_, iter) =>
iter.maxBy(row => {
row.getInt(0)
})
}(RowEncoder(df.schema))
distinct.show()
}
}
{code}
It seems that the Catalog Optimizer uses SchemaPruning and it raises exception
for the schema with the same name but different types.
> Misuse "merge schema" when mapGroups
> ------------------------------------
>
> Key: SPARK-36932
> URL: https://issues.apache.org/jira/browse/SPARK-36932
> Project: Spark
> Issue Type: Bug
> Components: Java API
> Affects Versions: 3.0.0
> Reporter: Wang Zekai
> Priority: Major
>
> {code:java}
> // Test case for this bug
> val spark = SparkSession.builder().master("local[*]").getOrCreate()
> val data1 = Seq(
> Row("0", 1),
> Row("0", 2))
> val schema1 = StructType(List(
> StructField("col0", StringType),
> StructField("col1", IntegerType))
> )
> val data2 = Seq(
> Row("0", 1),
> Row("0", 2))
> val schema2 = StructType(List(
> StructField("str0", StringType),
> StructField("col0", IntegerType))
> )
> val df1 = spark.createDataFrame(spark.sparkContext.makeRDD(data1), schema1)
> val df2 = spark.createDataFrame(spark.sparkContext.makeRDD(data2), schema2)
> val joined = df1.join(df2, df1("col0") === df2("str0"), "left")
> import spark.implicits._
> val distinct = joined
> .groupByKey {
> row => row.getInt(1)
> }
> .mapGroups {
> case (_, iter) =>
> iter.maxBy(row => {
> row.getInt(3)
> })
> }(RowEncoder(joined.schema))
> distinct.show(){code}
> {code:java}
> // A part of errors
> Exception in thread "main" org.apache.spark.SparkException: Failed to merge
> fields 'col0' and 'col0'. Failed to merge incompatible data types string and
> int
> at
> org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:593)
> at scala.Option.map(Option.scala:163)
> at
> org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:585)
> at org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted
> (StructType.scala:582)
> at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
> at org.apache.spark.sql.types.StructType$.merge(StructType.scala:582)
> at org.apache.spark.sql.types.StructType.merge(StructType.scala:492)
> at org.apache.spark.sql.catalyst.expressions.SchemaPruning$.$
> anonfun$pruneDataSchema$2(SchemaPruning.scala:36)
> at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
> at
> scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
> at scala.collection.immutable.List.foldLeft(List.scala:89)
> at
> scala.collection.LinearSeqOptimized.reduceLeft(LinearSeqOptimized.scala:140)
> at
> scala.collection.LinearSeqOptimized.reduceLeft$(LinearSeqOptimized.scala:138)
> at scala.collection.immutable.List.reduceLeft(List.scala:89)
> {code}
> After left join two dataframe which have two shemas with the same name but
> different types, we use groupByKey and mapGroups to get the result. But it
> will makes some mistakes. Is it my grammatical mistake? If not, I think It
> may be related to schema merge in StructType.scala: 593. How can I turn off
> schema merging?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]