[
https://issues.apache.org/jira/browse/SPARK-17806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15552830#comment-15552830
]
Josh Rosen commented on SPARK-17806:
------------------------------------
I was able to confirm that this is still a problem as of 2.0.1. To rule out the
possibility of this being a self-join bug, I rewrote this to read the parquet
file twice, then joined those tables together. Here's the query plan:
{code}
== Parsed Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
+- Join LeftOuter, (((a#1899 = a#1906) && (b#1900 = b#1907)) && (c#1901L =
c#1908L))
:- Relation[a#1899,b#1900,c#1901L] parquet
+- Relation[a#1906,b#1907,c#1908L] parquet
== Analyzed Logical Plan ==
a: int, b: int, c: bigint, a: int, b: int, c: bigint
GlobalLimit 21
+- LocalLimit 21
+- Join LeftOuter, (((a#1899 = a#1906) && (b#1900 = b#1907)) && (c#1901L =
c#1908L))
:- Relation[a#1899,b#1900,c#1901L] parquet
+- Relation[a#1906,b#1907,c#1908L] parquet
== Optimized Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
+- Join LeftOuter, (((a#1899 = a#1906) && (b#1900 = b#1907)) && (c#1901L =
c#1908L))
:- LocalLimit 21
: +- Relation[a#1899,b#1900,c#1901L] parquet
+- Relation[a#1906,b#1907,c#1908L] parquet
== Physical Plan ==
CollectLimit 21
+- *BroadcastHashJoin [a#1899, b#1900, c#1901L], [a#1906, b#1907, c#1908L],
LeftOuter, BuildRight
:- *LocalLimit 21
: +- *BatchedScan parquet [a#1899,b#1900,c#1901L] Format: ParquetFormat,
InputPaths: dbfs:/tmp/test, PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<a:int,b:int,c:bigint>
+- BroadcastExchange
HashedRelationBroadcastMode(List((shiftleft((shiftleft(cast(input[0, int, true]
as bigint), 32) | (cast(input[1, int, true] as bigint) & 4294967295)), 64) |
(cast(input[2, bigint, true] as bigint) & 0))))
+- *BatchedScan parquet [a#1906,b#1907,c#1908L] Format: ParquetFormat,
InputPaths: dbfs:/tmp/test, PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<a:int,b:int,c:bigint>
{code}
> Incorrect result when work with data from parquet
> -------------------------------------------------
>
> Key: SPARK-17806
> URL: https://issues.apache.org/jira/browse/SPARK-17806
> Project: Spark
> Issue Type: Bug
> Affects Versions: 2.0.0, 2.0.1
> Reporter: Vitaly Gerasimov
> Priority: Critical
> Labels: correctness
>
> {code}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.types.{StructField, StructType}
> import org.apache.spark.sql.types.DataTypes._
> val sc = SparkSession.builder().config(new
> SparkConf().setMaster("local")).getOrCreate()
> val jsonRDD = sc.sparkContext.parallelize(Seq(
> """{"a":1,"b":1,"c":1}""",
> """{"a":1,"b":1,"c":2}"""
> ))
> sc.read.schema(StructType(Seq(
> StructField("a", IntegerType),
> StructField("b", IntegerType),
> StructField("c", LongType)
> ))).json(jsonRDD).write.parquet("/tmp/test")
> val df = sc.read.load("/tmp/test")
> df.join(df, Seq("a", "b", "c"), "left_outer").show()
> {code}
> returns:
> {code}
> +---+---+---+
> | a| b| c|
> +---+---+---+
> | 1| 1| 1|
> | 1| 1| 1|
> | 1| 1| 2|
> | 1| 1| 2|
> +---+---+---+
> {code}
> Expected result:
> {code}
> +---+---+---+
> | a| b| c|
> +---+---+---+
> | 1| 1| 1|
> | 1| 1| 2|
> +---+---+---+
> {code}
> If I use this code without saving to parquet it works fine. If you change
> type of `c` column to `IntegerType` it also works fine.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]