[
https://issues.apache.org/jira/browse/SPARK-26436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747143#comment-16747143
]
Manish commented on SPARK-26436:
--------------------------------
[~viirya]: I meant: The same result of grouByKey with flatMap can also be
achieved by repartition with mapPartition function. In which case Spark does
not throw the IllegalArgumentException. The same code when written with
repartition
{code:java}
import java.util
import com.google.common.collect.Lists
import org.apache.spark.api.java.function.MapPartitionsFunction
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import scala.collection.mutable
object Test {
def main(args: Array[String]): Unit = {
val values = List(List("1", "One") ,List("1", "Two") ,List("2",
"Three"),List("2","4")).map(x =>(x(0), x(1)))
val session = SparkSession.builder.config("spark.master",
"local").getOrCreate
import session.implicits._
val dataFrame = values.toDF
val newSchema = StructType(dataFrame.schema.fields
++ Array(
StructField("Count", IntegerType, true)
)
)
val expr = RowEncoder.apply(newSchema)
val transform = dataFrame.repartition($"_1").mapPartitions(
new MapPartitionsFunction[Row, Row]() {
override def call(iterator: util.Iterator[Row]): util.Iterator[Row] = {
val map: scala.collection.mutable.Map[String,
mutable.MutableList[Row]] = new mutable.HashMap[String,
mutable.MutableList[Row]]().withDefaultValue(mutable.MutableList.empty[Row])
val listBuff = new util.ArrayList[Row]()
while (iterator.hasNext) {
val currRow = iterator.next
val key = currRow.getAs[String]("_1")
val listRows = map(key)
listRows += currRow
map(key) = listRows
}
for ((key, list) <- map) {
val length = list.size
var counter : Int= 0
for(i <- 0 until(length))
{
counter+=1
}
for(i <- 0 until length ) {
val x = list(i)
listBuff.add(Row.fromSeq(x.toSeq ++ Array[Int](counter)))
}
}
listBuff.iterator
}
},
expr
)
val newSchema1 = StructType(transform.schema.fields
++ Array(
StructField("Count1", IntegerType, true)
)
)
val expr1 = RowEncoder.apply(newSchema1)
val tranform2 = transform.repartition($"_1").mapPartitions(
new MapPartitionsFunction[Row, Row]() {
override def call(iterator: util.Iterator[Row]): util.Iterator[Row] =
{
val map: scala.collection.mutable.Map[String,
mutable.MutableList[Row]] = new mutable.HashMap[String,
mutable.MutableList[Row]]().withDefaultValue(mutable.MutableList.empty[Row])
val listBuff = new util.ArrayList[Row]()
while (iterator.hasNext) {
val currRow = iterator.next
val key = currRow.getAs[String]("_1")
val listRows = map(key)
listRows += currRow
map(key) = listRows
}
for ((key, list) <- map) {
val length = list.size
var counter : Int= 0
for(i <- 0 until(length))
{
counter+=1
}
for(i <- 0 until length ) {
val x = list(i)
listBuff.add(Row.fromSeq(x.toSeq ++ Array[Int](counter)))
}
}
listBuff.iterator
}
},
expr1
)
tranform2.show
}
}
Test.main(null)
{code}
The argument was should Spark not keep the consistency when using with
repartition or with groupByKey.
> Dataframe resulting from a GroupByKey and flatMapGroups operation throws
> java.lang.UnsupportedException when groupByKey is applied on it.
> -----------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-26436
> URL: https://issues.apache.org/jira/browse/SPARK-26436
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.4.0
> Reporter: Manish
> Priority: Major
>
> There seems to be a bug on groupByKey api for cases when it (groupByKey) is
> applied on a DataSet resulting from a former groupByKey and flatMapGroups
> invocation.
> In such cases groupByKey throws the following exception:
> java.lang.UnsupportedException: fieldIndex on a Row without schema is
> undefined.
>
> Although the dataframe has a valid schema and a groupBy("key") or
> repartition($"key") api calls on the same Dataframe and key succeed.
>
> Following is the code that reproduces the scenario:
>
> {code:scala}
>
> import org.apache.spark.sql.catalyst.encoders.RowEncoder
> import org.apache.spark.sql.{Row, SparkSession}
> import org.apache.spark.sql.types.{ IntegerType, StructField, StructType}
> import scala.collection.mutable.ListBuffer
> object Test {
> def main(args: Array[String]): Unit = {
> val values = List(List("1", "One") ,List("1", "Two") ,List("2",
> "Three"),List("2","4")).map(x =>(x(0), x(1)))
> val session = SparkSession.builder.config("spark.master",
> "local").getOrCreate
> import session.implicits._
> val dataFrame = values.toDF
> dataFrame.show()
> dataFrame.printSchema()
> val newSchema = StructType(dataFrame.schema.fields
> ++ Array(
> StructField("Count", IntegerType, false)
> )
> )
> val expr = RowEncoder.apply(newSchema)
> val tranform = dataFrame.groupByKey(row =>
> row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
> val inputSeq = inputItr.toSeq
> val length = inputSeq.size
> var listBuff = new ListBuffer[Row]()
> var counter : Int= 0
> for(i <- 0 until(length))
> {
> counter+=1
> }
> for(i <- 0 until length ) {
> var x = inputSeq(i)
> listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
> }
> listBuff.iterator
> })(expr)
> tranform.show
> val newSchema1 = StructType(tranform.schema.fields
> ++ Array(
> StructField("Count1", IntegerType, false)
> )
> )
> val expr1 = RowEncoder.apply(newSchema1)
> val tranform2 = tranform.groupByKey(row =>
> row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
> val inputSeq = inputItr.toSeq
> val length = inputSeq.size
> var listBuff = new ListBuffer[Row]()
> var counter : Int= 0
> for(i <- 0 until(length))
> {
> counter+=1
> }
> for(i <- 0 until length ) {
> var x = inputSeq(i)
> listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
> }
> listBuff.iterator
> })(expr1)
> tranform2.show
> }
> }
> Test.main(null)
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]