[ https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Grant Henke updated SPARK-26880: -------------------------------- Description: I have seen a simple case where InternalRows returned by `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are missing. This simple test illustrates the issue: {code} import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StructType import org.junit.Assert._ import org.junit.Test import org.scalatest.Matchers import org.scalatest.junit.JUnitSuite import org.slf4j.Logger import org.slf4j.LoggerFactory class SparkTest extends JUnitSuite with Matchers { val Log: Logger = LoggerFactory.getLogger(getClass) @Test def testSparkRowCorruption(): Unit = { val conf = new SparkConf() .setMaster("local[*]") .setAppName("test") .set("spark.ui.enabled", "false") val ss = SparkSession.builder().config(conf).getOrCreate() // Setup a DataFrame for testing. val data = Seq( Row.fromSeq(Seq(0, "0")), Row.fromSeq(Seq(25, "25")), Row.fromSeq(Seq(50, "50")), Row.fromSeq(Seq(75, "75")), Row.fromSeq(Seq(99, "99")), Row.fromSeq(Seq(100, "100")), Row.fromSeq(Seq(101, "101")), Row.fromSeq(Seq(125, "125")), Row.fromSeq(Seq(150, "150")), Row.fromSeq(Seq(175, "175")), Row.fromSeq(Seq(199, "199")) ) val dataRDD = ss.sparkContext.parallelize(data) val schema = StructType( Seq( StructField("key", IntegerType), StructField("value", StringType) )) val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema) // Convert to an RDD. val rdd = dataDF.queryExecution.toRdd // Collect the data to compare. val resultData = rdd.collect resultData.foreach { row => // Log for visualizing the corruption. Log.error(s"${row.getInt(0)}") } // Ensure the keys in the original data and resulting data match. val dataKeys = data.map(_.getInt(0)).toSet val resultKeys = resultData.map(_.getInt(0)).toSet assertEquals(dataKeys, resultKeys) } } {code} That test fails with the following: {noformat} 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199 expected:<Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)> but was:<Set(0, 25, 125, 150, 199, 99, 75, 100)> Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100) Actual :Set(0, 25, 125, 150, 199, 99, 75, 100) {noformat} If I map from and InternalRow to a Row the issue goes away: {code} val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows => val encoder = RowEncoder.apply(schema).resolveAndBind() internalRows.map(encoder.fromRow) } {code} was: I have seen a simple case where InternalRows returned by `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are missing. This simple test illustrates the issue: {code:scala} package org.apache.kudu.spark.kudu import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StructType import org.junit.Assert._ import org.junit.Test import org.scalatest.Matchers import org.scalatest.junit.JUnitSuite import org.slf4j.Logger import org.slf4j.LoggerFactory class SparkTest extends JUnitSuite with Matchers { val Log: Logger = LoggerFactory.getLogger(getClass) @Test def testSparkRowCorruption(): Unit = { val conf = new SparkConf() .setMaster("local[*]") .setAppName("test") .set("spark.ui.enabled", "false") val ss = SparkSession.builder().config(conf).getOrCreate() // Setup a DataFrame for testing. val data = Seq( Row.fromSeq(Seq(0, "0")), Row.fromSeq(Seq(25, "25")), Row.fromSeq(Seq(50, "50")), Row.fromSeq(Seq(75, "75")), Row.fromSeq(Seq(99, "99")), Row.fromSeq(Seq(100, "100")), Row.fromSeq(Seq(101, "101")), Row.fromSeq(Seq(125, "125")), Row.fromSeq(Seq(150, "150")), Row.fromSeq(Seq(175, "175")), Row.fromSeq(Seq(199, "199")) ) val dataRDD = ss.sparkContext.parallelize(data) val schema = StructType( Seq( StructField("key", IntegerType), StructField("value", StringType) )) val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema) // Convert to an RDD. val rdd = dataDF.queryExecution.toRdd // Collect the data to compare. val resultData = rdd.collect resultData.foreach { row => // Log for visualizing the corruption. Log.error(s"${row.getInt(0)}") } // Ensure the keys in the original data and resulting data match. val dataKeys = data.map(_.getInt(0)).toSet val resultKeys = resultData.map(_.getInt(0)).toSet assertEquals(dataKeys, resultKeys) } } {code} That test fails with the following: {noformat} 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199 expected:<Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)> but was:<Set(0, 25, 125, 150, 199, 99, 75, 100)> Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100) Actual :Set(0, 25, 125, 150, 199, 99, 75, 100) {noformat} If I map from and InternalRow to a Row the issue goes away: {code:scala} val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows => val encoder = RowEncoder.apply(schema).resolveAndBind() internalRows.map(encoder.fromRow) } {code} > dataDF.queryExecution.toRdd corrupt rows > ---------------------------------------- > > Key: SPARK-26880 > URL: https://issues.apache.org/jira/browse/SPARK-26880 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.0 > Reporter: Grant Henke > Priority: Major > > I have seen a simple case where InternalRows returned by > `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are > missing. > This simple test illustrates the issue: > {code} > import org.apache.spark.SparkConf > import org.apache.spark.sql.Row > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.catalyst.encoders.RowEncoder > import org.apache.spark.sql.types.IntegerType > import org.apache.spark.sql.types.StringType > import org.apache.spark.sql.types.StructField > import org.apache.spark.sql.types.StructType > import org.junit.Assert._ > import org.junit.Test > import org.scalatest.Matchers > import org.scalatest.junit.JUnitSuite > import org.slf4j.Logger > import org.slf4j.LoggerFactory > class SparkTest extends JUnitSuite with Matchers { > val Log: Logger = LoggerFactory.getLogger(getClass) > @Test > def testSparkRowCorruption(): Unit = { > val conf = new SparkConf() > .setMaster("local[*]") > .setAppName("test") > .set("spark.ui.enabled", "false") > val ss = SparkSession.builder().config(conf).getOrCreate() > // Setup a DataFrame for testing. > val data = Seq( > Row.fromSeq(Seq(0, "0")), > Row.fromSeq(Seq(25, "25")), > Row.fromSeq(Seq(50, "50")), > Row.fromSeq(Seq(75, "75")), > Row.fromSeq(Seq(99, "99")), > Row.fromSeq(Seq(100, "100")), > Row.fromSeq(Seq(101, "101")), > Row.fromSeq(Seq(125, "125")), > Row.fromSeq(Seq(150, "150")), > Row.fromSeq(Seq(175, "175")), > Row.fromSeq(Seq(199, "199")) > ) > val dataRDD = ss.sparkContext.parallelize(data) > val schema = StructType( > Seq( > StructField("key", IntegerType), > StructField("value", StringType) > )) > val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema) > // Convert to an RDD. > val rdd = dataDF.queryExecution.toRdd > > // Collect the data to compare. > val resultData = rdd.collect > resultData.foreach { row => > // Log for visualizing the corruption. > Log.error(s"${row.getInt(0)}") > } > // Ensure the keys in the original data and resulting data match. > val dataKeys = data.map(_.getInt(0)).toSet > val resultKeys = resultData.map(_.getInt(0)).toSet > assertEquals(dataKeys, resultKeys) > } > } > {code} > That test fails with the following: > {noformat} > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0 > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25 > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 > 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 > 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99 > 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100 > 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125 > 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125 > 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150 > 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199 > 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199 > expected:<Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)> but > was:<Set(0, 25, 125, 150, 199, 99, 75, 100)> > Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100) > Actual :Set(0, 25, 125, 150, 199, 99, 75, 100) > {noformat} > If I map from and InternalRow to a Row the issue goes away: > {code} > val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows => > val encoder = RowEncoder.apply(schema).resolveAndBind() > internalRows.map(encoder.fromRow) > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org