On Tue, Feb 7, 2017 at 2:21 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> I think the fastest way is likely to use a combination of conditionals
> (when / otherwise), first (ignoring nulls), while grouping by the id.
> This should get the answer with only a single shuffle.
>
> Here is an example
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3908422850525880/2840265927289860/latest.html>
> .
>

Very cool! Using the simpler aggregates feels cleaner.


>
> On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>
>> Hi Everett,
>>
>> That's pretty much what I'd do. Can't think of a way to beat your
>> solution. Why do you "feel vaguely uneasy about it"?
>>
>
Maybe it felt like I was unnecessarily grouping-by twice, but probably
mostly that I hadn't used pivot before.

Interestingly, the physical plans are not especially different between
these two solutions after the rank column is added. They both have two
SortAggregates that seem to be figuring out where to put results based on
the rank:

My original one:

== Physical Plan ==
*Project [id#279, name#280, 1#372.extra AS extra1#409, 1#372.data AS
data1#435, 1#372.priority AS priority1#462, 2#374.extra AS extra2#490,
2#374.data AS data2#519, 2#374.priority AS priority2#549, 3#376.extra AS
extra3#580, 3#376.data AS data3#612, 3#376.priority AS priority3#645]
+- SortAggregate(key=[id#279,name#280], functions=[first(if ((cast(rank#292
as double) = 1.0)) temp_struct#312 else null, true),first(if
((cast(rank#292 as double) = 2.0)) temp_struct#312 else null,
true),first(if ((cast(rank#292 as double) = 3.0)) temp_struct#312 else
null, true)])
   +- SortAggregate(key=[id#279,name#280], functions=[partial_first(if
((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
true),partial_first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312
else null, true),partial_first(if ((cast(rank#292 as double) = 3.0))
temp_struct#312 else null, true)])
      +- *Project [id#279, name#280, rank#292, struct(extra#281, data#282,
priority#283) AS temp_struct#312]
         +- Window [denserank(priority#283) windowspecdefinition(id#279,
name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW) AS rank#292], [id#279, name#280], [priority#283 ASC]
            +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0
               +- Exchange hashpartitioning(id#279, name#280, 200)
                  +- Scan
ExistingRDD[id#279,name#280,extra#281,data#282,priority#283]


And modifying Michael's slightly to use a rank:

import org.apache.spark.sql.functions._

def getColumnWithRank(column: String, rank: Int) = {
  first(when(col("rank") === lit(rank), col(column)).otherwise(null),
ignoreNulls = true)
}

val withRankColumn = data.withColumn("rank",
functions.dense_rank().over(Window.partitionBy("id",
"name").orderBy("priority")))

val modCollapsed = withRankColumn
  .groupBy($"id", $"name")
  .agg(
    getColumnWithRank("data", 1) as 'data1,
    getColumnWithRank("data", 2) as 'data2,
    getColumnWithRank("data", 3) as 'data3,
    getColumnWithRank("extra", 1) as 'extra1,
    getColumnWithRank("extra", 2) as 'extra2,
    getColumnWithRank("extra", 3) as 'extra3)


modCollapsed.explain

== Physical Plan ==
SortAggregate(key=[id#279,name#280], functions=[first(CASE WHEN (rank#965 =
1) THEN data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 2) THEN
data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 3) THEN data#282
ELSE null END, true),first(CASE WHEN (rank#965 = 1) THEN extra#281 ELSE
null END, true),first(CASE WHEN (rank#965 = 2) THEN extra#281 ELSE null
END, true),first(CASE WHEN (rank#965 = 3) THEN extra#281 ELSE null END,
true)])
+- SortAggregate(key=[id#279,name#280], functions=[partial_first(CASE WHEN
(rank#965 = 1) THEN data#282 ELSE null END, true),partial_first(CASE WHEN
(rank#965 = 2) THEN data#282 ELSE null END, true),partial_first(CASE WHEN
(rank#965 = 3) THEN data#282 ELSE null END, true),partial_first(CASE WHEN
(rank#965 = 1) THEN extra#281 ELSE null END, true),partial_first(CASE WHEN
(rank#965 = 2) THEN extra#281 ELSE null END, true),partial_first(CASE WHEN
(rank#965 = 3) THEN extra#281 ELSE null END, true)])
   +- *Project [id#279, name#280, extra#281, data#282, rank#965]
      +- Window [denserank(priority#283) windowspecdefinition(id#279,
name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW) AS rank#965], [id#279, name#280], [priority#283 ASC]
         +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0
            +- Exchange hashpartitioning(id#279, name#280, 200)
               +- Scan
ExistingRDD[id#279,name#280,extra#281,data#282,priority#283]



>
>> I'd also check out the execution plan (with explain) to see how it's
>> gonna work at runtime. I may have seen groupBy + join be better than
>> window (there were more exchanges in play for windows I reckon).
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson <ever...@nuna.com>
>> wrote:
>> >
>> >
>> > On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl>
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> Could groupBy and withColumn or UDAF work perhaps? I think window could
>> >> help here too.
>> >
>> >
>> > This seems to work, but I do feel vaguely uneasy about it. :)
>> >
>> > // First add a 'rank' column which is priority order just in case
>> priorities
>> > aren't
>> > // from 1 with no gaps.
>> > val temp1 = data.withColumn("rank", functions.dense_rank()
>> >    .over(Window.partitionBy("id", "name").orderBy("priority")))
>> >
>> > +---+----+-----+------+--------+----+
>> > | id|name|extra|  data|priority|rank|
>> > +---+----+-----+------+--------+----+
>> > |  1|Fred|    8|value1|       1|   1|
>> > |  1|Fred|    8|value8|       2|   2|
>> > |  1|Fred|    8|value5|       3|   3|
>> > |  2| Amy|    9|value3|       1|   1|
>> > |  2| Amy|    9|value5|       2|   2|
>> > +---+----+-----+------+--------+----+
>> >
>> > // Now move all the columns we want to denormalize into a struct column
>> to
>> > keep them together.
>> > val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
>> > temp1("data"), temp1("priority")))
>> >   .drop("extra", "data", "priority")
>> >
>> > +---+----+----+------------+
>> > | id|name|rank| temp_struct|
>> > +---+----+----+------------+
>> > |  1|Fred|   1|[8,value1,1]|
>> > |  1|Fred|   2|[8,value8,2]|
>> > |  1|Fred|   3|[8,value5,3]|
>> > |  2| Amy|   1|[9,value3,1]|
>> > |  2| Amy|   2|[9,value5,2]|
>> > +---+----+----+------------+
>> >
>> > // groupBy, again, but now pivot the rank column. We need an aggregate
>> > function after pivot,
>> > // so use first -- there will only ever be one element.
>> > val temp3 = temp2.groupBy("id", "name")
>> >   .pivot("rank", Seq("1", "2", "3"))
>> >   .agg(functions.first("temp_struct"))
>> >
>> > +---+----+------------+------------+------------+
>> > | id|name|           1|           2|           3|
>> > +---+----+------------+------------+------------+
>> > |  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
>> > |  2| Amy|[9,value3,1]|[9,value5,2]|        null|
>> > +---+----+------------+------------+------------+
>> >
>> > // Now just moving things out of the structs and clean up.
>> > val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
>> >      .withColumn("data1", temp3("1").getField("data"))
>> >      .withColumn("priority1", temp3("1").getField("priority"))
>> >      .withColumn("extra2", temp3("2").getField("extra"))
>> >      .withColumn("data2", temp3("2").getField("data"))
>> >      .withColumn("priority2", temp3("2").getField("priority"))
>> >      .withColumn("extra3", temp3("3").getField("extra"))
>> >      .withColumn("data3", temp3("3").getField("data"))
>> >      .withColumn("priority3", temp3("3").getField("priority"))
>> >      .drop("1", "2", "3")
>> >
>> > +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> > | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
>> > data3|priority3|
>> > +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> > |  1|Fred|     8|value1|        1|     8|value8|        2|     8|value5|
>> > 3|
>> > |  2| Amy|     9|value3|        1|     9|value5|        2|  null|  null|
>> > null|
>> > +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >>
>> >>
>> >> Jacek
>> >>
>> >> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ever...@nuna.com.invalid>
>> >> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I'm trying to un-explode or denormalize a table like
>> >>>
>> >>> +---+----+-----+------+--------+
>> >>> |id |name|extra|data  |priority|
>> >>> +---+----+-----+------+--------+
>> >>> |1  |Fred|8    |value1|1       |
>> >>> |1  |Fred|8    |value8|2       |
>> >>> |1  |Fred|8    |value5|3       |
>> >>> |2  |Amy |9    |value3|1       |
>> >>> |2  |Amy |9    |value5|2       |
>> >>> +---+----+-----+------+--------+
>> >>>
>> >>> into something that looks like
>> >>>
>> >>>
>> >>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> >>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
>> >>> |priority3|
>> >>>
>> >>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> >>> |1  |Fred|8     |value1|1        |8     |value8|2        |8
>>  |value5|3
>> >>> |
>> >>> |2  |Amy |9     |value3|1        |9     |value5|2        |null  |null
>> >>> |null     |
>> >>>
>> >>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> >>>
>> >>> If I were going the other direction, I'd create a new column with an
>> >>> array of structs, each with 'extra', 'data', and 'priority' fields
>> and then
>> >>> explode it.
>> >>>
>> >>> Going from the more normalized view, though, I'm having a harder time.
>> >>>
>> >>> I want to group or partition by (id, name) and order by priority, but
>> >>> after that I can't figure out how to get multiple rows rotated into
>> one.
>> >>>
>> >>> Any ideas?
>> >>>
>> >>> Here's the code to create the input table above:
>> >>>
>> >>> import org.apache.spark.sql.Row
>> >>> import org.apache.spark.sql.Dataset
>> >>> import org.apache.spark.sql.types._
>> >>>
>> >>> val rowsRDD = sc.parallelize(Seq(
>> >>>     Row(1, "Fred", 8, "value1", 1),
>> >>>     Row(1, "Fred", 8, "value8", 2),
>> >>>     Row(1, "Fred", 8, "value5", 3),
>> >>>     Row(2, "Amy", 9, "value3", 1),
>> >>>     Row(2, "Amy", 9, "value5", 2)))
>> >>>
>> >>> val schema = StructType(Seq(
>> >>>     StructField("id", IntegerType, nullable = true),
>> >>>     StructField("name", StringType, nullable = true),
>> >>>     StructField("extra", IntegerType, nullable = true),
>> >>>     StructField("data", StringType, nullable = true),
>> >>>     StructField("priority", IntegerType, nullable = true)))
>> >>>
>> >>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>> >>>
>> >>>
>> >>>
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>

Reply via email to