[ 
https://issues.apache.org/jira/browse/SPARK-26436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747143#comment-16747143
 ] 

Manish commented on SPARK-26436:
--------------------------------

[~viirya]: I meant: The same result of grouByKey with flatMap can also be 
achieved by repartition with mapPartition function. In which case Spark does 
not throw the IllegalArgumentException. The same code when written with 
repartition


{code:java}
import java.util

import com.google.common.collect.Lists
import org.apache.spark.api.java.function.MapPartitionsFunction
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

import scala.collection.mutable



  object Test {

    def main(args: Array[String]): Unit = {

      val values = List(List("1", "One") ,List("1", "Two") ,List("2", 
"Three"),List("2","4")).map(x =>(x(0), x(1)))
      val session = SparkSession.builder.config("spark.master", 
"local").getOrCreate
      import session.implicits._
      val dataFrame = values.toDF



      val newSchema = StructType(dataFrame.schema.fields
        ++ Array(
        StructField("Count", IntegerType, true)
        )
      )

      val expr = RowEncoder.apply(newSchema)

      val transform =  dataFrame.repartition($"_1").mapPartitions(


      new MapPartitionsFunction[Row, Row]() {

        override def call(iterator: util.Iterator[Row]): util.Iterator[Row] = {

          val map: scala.collection.mutable.Map[String, 
mutable.MutableList[Row]] = new mutable.HashMap[String, 
mutable.MutableList[Row]]().withDefaultValue(mutable.MutableList.empty[Row])

          val listBuff = new util.ArrayList[Row]()

          while (iterator.hasNext) {
            val currRow = iterator.next

            val key = currRow.getAs[String]("_1")
            val listRows = map(key)

            listRows += currRow

            map(key) = listRows
          }

          for ((key, list) <- map) {
            val length = list.size

            var counter : Int= 0
            for(i <- 0 until(length))
            {
              counter+=1

            }

            for(i <- 0 until length ) {
              val x = list(i)
              listBuff.add(Row.fromSeq(x.toSeq ++ Array[Int](counter)))
            }

          }
          listBuff.iterator
        }
      },
        expr

    )




      val newSchema1 = StructType(transform.schema.fields
        ++ Array(
        StructField("Count1", IntegerType, true)
        )
      )

      val expr1 = RowEncoder.apply(newSchema1)


      val tranform2 =  transform.repartition($"_1").mapPartitions(


        new MapPartitionsFunction[Row, Row]() {

          override def call(iterator: util.Iterator[Row]): util.Iterator[Row] = 
{

            val map: scala.collection.mutable.Map[String, 
mutable.MutableList[Row]] = new mutable.HashMap[String, 
mutable.MutableList[Row]]().withDefaultValue(mutable.MutableList.empty[Row])

            val listBuff = new util.ArrayList[Row]()

            while (iterator.hasNext) {
              val currRow = iterator.next

              val key = currRow.getAs[String]("_1")
              val listRows = map(key)

              listRows += currRow

              map(key) = listRows
            }

            for ((key, list) <- map) {
              val length = list.size

              var counter : Int= 0
              for(i <- 0 until(length))
              {
                counter+=1

              }

              for(i <- 0 until length ) {
                val x = list(i)
                listBuff.add(Row.fromSeq(x.toSeq ++ Array[Int](counter)))
              }

            }
            listBuff.iterator
          }
        },
        expr1

      )


    tranform2.show

    }
}
Test.main(null)
{code}
 
The argument was should Spark not keep the consistency when using with 
repartition or with groupByKey.

> Dataframe resulting from a GroupByKey and flatMapGroups operation throws 
> java.lang.UnsupportedException when groupByKey is applied on it.
> -----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26436
>                 URL: https://issues.apache.org/jira/browse/SPARK-26436
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.0
>            Reporter: Manish
>            Priority: Major
>
> There seems to be a bug on groupByKey api for cases when it (groupByKey) is 
> applied on a DataSet resulting from a former groupByKey and flatMapGroups 
> invocation.
> In such cases groupByKey throws the following exception:
> java.lang.UnsupportedException: fieldIndex on a Row without schema is 
> undefined.
>  
> Although the dataframe has a valid schema and a groupBy("key") or 
> repartition($"key") api calls on the same Dataframe and key succeed.
>  
> Following is the code that reproduces the scenario:
>  
> {code:scala}
>  
>    import org.apache.spark.sql.catalyst.encoders.RowEncoder
> import org.apache.spark.sql.{Row, SparkSession}
> import org.apache.spark.sql.types.{ IntegerType, StructField, StructType}
> import scala.collection.mutable.ListBuffer
>   object Test {
>     def main(args: Array[String]): Unit = {
>       val values = List(List("1", "One") ,List("1", "Two") ,List("2", 
> "Three"),List("2","4")).map(x =>(x(0), x(1)))
>       val session = SparkSession.builder.config("spark.master", 
> "local").getOrCreate
>       import session.implicits._
>       val dataFrame = values.toDF
>       dataFrame.show()
>       dataFrame.printSchema()
>       val newSchema = StructType(dataFrame.schema.fields
>         ++ Array(
>         StructField("Count", IntegerType, false)
>       )
>       )
>       val expr = RowEncoder.apply(newSchema)
>       val tranform =  dataFrame.groupByKey(row => 
> row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
>         val inputSeq = inputItr.toSeq
>         val length = inputSeq.size
>         var listBuff = new ListBuffer[Row]()
>         var counter : Int= 0
>         for(i <- 0 until(length))
>         {
>           counter+=1
>         }
>         for(i <- 0 until length ) {
>           var x = inputSeq(i)
>           listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
>         }
>         listBuff.iterator
>       })(expr)
>       tranform.show
>       val newSchema1 = StructType(tranform.schema.fields
>         ++ Array(
>         StructField("Count1", IntegerType, false)
>       )
>       )
>       val expr1 = RowEncoder.apply(newSchema1)
>       val tranform2 =  tranform.groupByKey(row => 
> row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
>         val inputSeq = inputItr.toSeq
>         val length = inputSeq.size
>         var listBuff = new ListBuffer[Row]()
>         var counter : Int= 0
>         for(i <- 0 until(length))
>         {
>           counter+=1
>         }
>         for(i <- 0 until length ) {
>           var x = inputSeq(i)
>           listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
>         }
>         listBuff.iterator
>       })(expr1)
>       tranform2.show
>     }
> }
> Test.main(null)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to