[
https://issues.apache.org/jira/browse/SPARK-32929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371997#comment-17371997
]
Simrit Kaur commented on SPARK-32929:
-------------------------------------
I am also facing this issue on SPARK 3.1.1 version on IBM Z. It seems that Java
isn't shielding the impact, Can we make some changes in the code as suggested
here to fix this? Any other suggestions or hint is highly appreciated.
> StreamSuite failure on IBM Z: - SPARK-20432: union one stream with itself
> -------------------------------------------------------------------------
>
> Key: SPARK-32929
> URL: https://issues.apache.org/jira/browse/SPARK-32929
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.0.1
> Environment: openjdk version "11.0.8" 2020-07-14
> OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.8+10)
> OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.8+10, mixed mode)
> Linux 4.15.0-117-generic #118-Ubuntu SMP Fri Sep 4 20:00:20 UTC 2020 s390x
> s390x s390x GNU/Linux
> Reporter: Michael Munday
> Priority: Minor
> Labels: big-endian
>
> I am getting zeros in the output of this test on IBM Z. This is a big-endian
> system. See error below.
> I think this issue is related to the use of {{IntegerType}} in the schema for
> {{FakeDefaultSource}}. Modifying the schema to use {{LongType}} fixes the
> issue. Another workaround is to remove {{.select("a")}} (see patch below).
> My working theory is that long data (longs are generated by Range) is being
> read using unsafe int operations (as specified in the schema). This would
> 'work' on little-endian systems but not big-endian systems. I'm still working
> to figure out what the mechanism is and I'd appreciate any hints or insights.
> The error looks like this:
> {noformat}
> - SPARK-20432: union one stream with itself *** FAILED ***
> Decoded objects do not match expected objects:
> expected: WrappedArray(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 1, 2, 3, 4, 5,
> 6, 7, 8, 9, 10)
> actual: WrappedArray(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
> 0, 0, 0, 0, 0)
> assertnotnull(upcast(getcolumnbyordinal(0, LongType), LongType, - root
> class: "scala.Long"))
> +- upcast(getcolumnbyordinal(0, LongType), LongType, - root class:
> "scala.Long")
> +- getcolumnbyordinal(0, LongType) (QueryTest.scala:88)
> {noformat}
> This change fixes the issue:
> {code:java}
> --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
> +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
> @@ -45,7 +45,7 @@ import org.apache.spark.sql.functions._
> import org.apache.spark.sql.internal.SQLConf
> import org.apache.spark.sql.sources.StreamSourceProvider
> import org.apache.spark.sql.streaming.util.{BlockOnStopSourceProvider,
> StreamManualClock}
> -import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
> +import org.apache.spark.sql.types.{IntegerType, LongType, StructField,
> StructType}
> import org.apache.spark.util.Utils
> class StreamSuite extends StreamTest {
> @@ -1265,7 +1265,7 @@ class StreamSuite extends StreamTest {
> }
> abstract class FakeSource extends StreamSourceProvider {
> - private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
> + private val fakeSchema = StructType(StructField("a", LongType) :: Nil)
> override def sourceSchema(
> spark: SQLContext,
> @@ -1287,7 +1287,7 @@ class FakeDefaultSource extends FakeSource {
> new Source {
> private var offset = -1L
> - override def schema: StructType = StructType(StructField("a",
> IntegerType) :: Nil)
> + override def schema: StructType = StructType(StructField("a",
> LongType) :: Nil)
> override def getOffset: Option[Offset] = {
> if (offset >= 10) {
> {code}
> Alternatively, this change also fixes the issue:
> {code:java}
> --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
> +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
> @@ -154,7 +154,7 @@ class StreamSuite extends StreamTest {
> }
>
> test("SPARK-20432: union one stream with itself") {
> - val df =
> spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
> + val df =
> spark.readStream.format(classOf[FakeDefaultSource].getName).load()
> val unioned = df.union(df)
> withTempDir { outputDir =>
> withTempDir { checkpointDir =>
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]