I didn't tailor it to your needs, but this is what I can offer you, the
idea should be pretty clear

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{collect_list, struct}

val spark: SparkSession

import spark.implicits._

case class Input(
  a: Int,
  b: Long,
  c: Int,
  d: Int,
  e: String
)

case class Output(
  a: Int,
  b: Long,
  data: Seq[(Int, Int, String)]
)

val df = spark.createDataFrame(Seq(
  Input(1, 1, 1, 1, "a"),
  Input(1, 1, 1, 1, "b"),
  Input(1, 1, 1, 1, "c"),
  Input(1, 2, 3, 3, "d")
))

val dfOut = df.groupBy("a", "b")
  .agg(collect_list(struct($"c", $"d", $"e")))
  .queryExecution.toRdd.mapPartitions(_.map(r => {
    val a = r.getInt(0)
    val b = r.getLong(1)

    val list = r.getArray(2)

    Output(
      a,
      b,
      (0 until list.numElements()).map(i => {
        val struct = list.getStruct(i, 3)
        val c = struct.getInt(0)
        val d = struct.getInt(1)
        val e = struct.getString(2)

        (c, d, e)
      })
    )
  })).toDF()
dfOut.explain(extended = true)
dfOut.show()


On Mon, Aug 28, 2017 at 10:47 AM, Patrick <titlibat...@gmail.com> wrote:

> Hi
>
> I have two lists:
>
>
>    - List one: contains names of columns on which I want to do aggregate
>    operations.
>    - List two: contains the aggregate operations on which I want to
>    perform on each column eg ( min, max, mean)
>
> I am trying to use spark 2.0 dataset to achieve this. Spark provides an
> agg() where you can pass a Map <String,String> (of column name and
> respective aggregate operation ) as input, however I want to perform
> different aggregation operations on the same column of the data and want to
> collect the result in a Map<String,String> where key is the aggregate
> operation and Value is the result on the particular column.  If i add
> different agg() to same column, the key gets updated with latest value.
>
> Also I dont find any collectAsMap() operation that returns map of
> aggregated column name as key and result as value. I get collectAsList()
> but i dont know the order in which those agg() operations are run so how do
> i match which list values corresponds to which agg operation.  I am able to
> see the result using .show() but How can i collect the result in this case ?
>
> Is it possible to do different aggregation on the same column in one
> Job(i.e only one collect operation) using agg() operation?
>
>
> Thanks in advance.
>
>

Reply via email to