[ https://issues.apache.org/jira/browse/SPARK-26436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Marcelo Vanzin updated SPARK-26436: ----------------------------------- Component/s: (was: Spark Core) SQL > Dataframe resulting from a GroupByKey and flatMapGroups operation throws > java.lang.UnsupportedException when groupByKey is applied on it. > ----------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-26436 > URL: https://issues.apache.org/jira/browse/SPARK-26436 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.0 > Reporter: Manish > Priority: Major > > There seems to be a bug on groupByKey api for cases when it (groupByKey) is > applied on a DataSet resulting from a former groupByKey and flatMapGroups > invocation. > In such cases groupByKey throws the following exception: > java.lang.UnsupportedException: fieldIndex on a Row without schema is > undefined. > > Although the dataframe has a valid schema and a groupBy("key") or > repartition($"key") api calls on the same Dataframe and key succeed. > > Following is the code that reproduces the scenario: > > {code:scala} > > import org.apache.spark.sql.catalyst.encoders.RowEncoder > import org.apache.spark.sql.{Row, SparkSession} > import org.apache.spark.sql.types.{ IntegerType, StructField, StructType} > import scala.collection.mutable.ListBuffer > object Test { > def main(args: Array[String]): Unit = { > val values = List(List("1", "One") ,List("1", "Two") ,List("2", > "Three"),List("2","4")).map(x =>(x(0), x(1))) > val session = SparkSession.builder.config("spark.master", > "local").getOrCreate > import session.implicits._ > val dataFrame = values.toDF > dataFrame.show() > dataFrame.printSchema() > val newSchema = StructType(dataFrame.schema.fields > ++ Array( > StructField("Count", IntegerType, false) > ) > ) > val expr = RowEncoder.apply(newSchema) > val tranform = dataFrame.groupByKey(row => > row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { > val inputSeq = inputItr.toSeq > val length = inputSeq.size > var listBuff = new ListBuffer[Row]() > var counter : Int= 0 > for(i <- 0 until(length)) > { > counter+=1 > } > for(i <- 0 until length ) { > var x = inputSeq(i) > listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter)) > } > listBuff.iterator > })(expr) > tranform.show > val newSchema1 = StructType(tranform.schema.fields > ++ Array( > StructField("Count1", IntegerType, false) > ) > ) > val expr1 = RowEncoder.apply(newSchema1) > val tranform2 = tranform.groupByKey(row => > row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { > val inputSeq = inputItr.toSeq > val length = inputSeq.size > var listBuff = new ListBuffer[Row]() > var counter : Int= 0 > for(i <- 0 until(length)) > { > counter+=1 > } > for(i <- 0 until length ) { > var x = inputSeq(i) > listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter)) > } > listBuff.iterator > })(expr1) > tranform2.show > } > } > Test.main(null) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org