[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai updated SPARK-22231:
----------------------------
    Description: 
At Netflix's algorithm team, we work on ranking problems to find the great 
content to fulfill the unique tastes of our members. Before building a 
recommendation algorithms, we need to prepare the training, testing, and 
validation datasets in Apache Spark. Due to the nature of ranking problems, we 
have a nested list of items to be ranked in one column, and the top level is 
the contexts describing the setting for where a model is to be used (e.g. 
profiles, country, time, device, etc.)  Here is a blog post describing the 
details, [Distributed Time Travel for Feature 
Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
 
To be more concrete, for the ranks of videos for a given profile_id at a given 
country, our data schema can be looked like this,
{code:java}
root
 |-- profile_id: long (nullable = true)
 |-- country_iso_code: string (nullable = true)
 |-- items: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- title_id: integer (nullable = true)
 |    |    |-- scores: double (nullable = true)
...
{code}

We oftentimes need to work on the nested list of structs by applying some 
functions on them. Sometimes, we're dropping or adding new columns in the 
nested list of structs. Currently, there is no easy solution in open source 
Apache Spark to perform those operations using SQL primitives; many people just 
convert the data into RDD to work on the nested level of data, and then 
reconstruct the new dataframe as workaround. This is extremely inefficient 
because all the optimizations like predicate pushdown in SQL can not be 
performed, we can not leverage on the columnar format, and the serialization 
and deserialization cost becomes really huge even we just want to add a new 
column in the nested level.

We built a solution internally at Netflix which we're very happy with. We plan 
to make it open source in Spark upstream, and we would like to socialize the 
API design to see if we miss any use-case.  

The first API we added is *mapItems* on dataframe which take a function from 
*Column* to *Column*, and then apply the function on nested dataframe. Here is 
an example,
{code:java}
case class Data(foo: Int, bar: Double, items: Seq[Double])

val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
  Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
))

val result = df.mapItems("items") {
  item => item * 2.0
}

result.printSchema()
// root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// |    |-- element: double (containsNull = true)

result.show()
// +---+----+--------------------+
// |foo| bar|               items|
// +---+----+--------------------+
// | 10|10.0|[20.2, 20.4, 20.6...|
// | 20|20.0|[40.2, 40.4, 40.6...|
// +---+----+--------------------+
{code}

Now, with the ability of applying a function in the nested dataframe, we can 
add a new function, *withColumn* in *Column* to add or replace the existing 
column that has the same name in the nested list of struct. Here is two 
examples demonstrating the API together with *mapItems*; the first one replaces 
the existing column,
{code:java}
case class Item(a: Int, b: Double)

case class Data(foo: Int, bar: Double, items: Seq[Item])

val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "b")
}

result.printSchema
root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- a: integer (nullable = true)
// |    |    |-- b: double (nullable = true)

result.show(false)
// +---+----+----------------------+
// |foo|bar |items                 |
// +---+----+----------------------+
// |10 |10.0|[[10,11.0], [11,12.0]]|
// |20 |20.0|[[20,21.0], [21,22.0]]|
// +---+----+----------------------+
{code}
and the second one adds a new column in the nested dataframe.
{code:java}
val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "b")
}

result.printSchema
root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- a: integer (nullable = true)
// |    |    |-- b: double (nullable = true)
// |    |    |-- c: double (nullable = true)

result.show(false)
// +---+----+--------------------------------+
// |foo|bar |items                           |
// +---+----+--------------------------------+
// |10 |10.0|[[10,10.0,11.0], [11,11.0,12.0]]|
// |20 |20.0|[[20,20.0,21.0], [21,21.0,22.0]]|
// +---+----+--------------------------------+
{code}

We also implement a filter predicate to nested list of struct, and it will 
return those items which matched the predicate. The following is the API 
example,
{code:java}
val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.filterItems("items") {
  item => item("a") < 20
}

// +---+----+----------------------+
// |foo|bar |items                 |
// +---+----+----------------------+
// |10 |10.0|[[10,10.0], [11,11.0]]|
// |20 |20.0|[]                    |
// +---+----+----------------------+
{code}

Dropping a column in the nested list of struct can be achieved by similar API 
to *withColumn*. We add *drop* method to *Column* to implement this. Here is an 
example,
{code:java}
val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.mapItems("items") {
  item => item.drop("b")
}

result.printSchema
root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- a: integer (nullable = true)

result.show(false)
// +---+----+------------+
// |foo|bar |items       |
// +---+----+------------+
// |10 |10.0|[[10], [11]]|
// |20 |20.0|[[20], [21]]|
// +---+----+------------+
{code}

As you can see, those APIs are not opaque to Spark optimizers, and can fully 
take advantage of columnar data structure. 

We're looking forward to the community feedback and suggestion! Thanks.

  was:
At Netflix's algorithm team, we work on ranking problems to find the great 
content to fulfill the unique tastes of our members. Before building a 
recommendation algorithms, we need to prepare the training, testing, and 
validation datasets in Apache Spark. Due to the nature of ranking problems, we 
have a nested list of items to be ranked in one column, and the top level is 
the contexts describing the setting for where a model is to be used (e.g. 
profiles, country, time, device, etc.)  Here is a blog post describing the 
details, [Distributed Time Travel for Feature 
Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907]
 
To be more concrete, for the ranks of videos for a given profile_id at a given 
country, our data schema can be looked like this,
{code:java}
root
 |-- profile_id: long (nullable = true)
 |-- country_iso_code: string (nullable = true)
 |-- items: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- title_id: integer (nullable = true)
 |    |    |-- scores: double (nullable = true)
...
{code}

We oftentimes need to work on the nested level by applying some functions in 
it, even dropping, or adding new columns in the nested level. Currently, there 
is no easy solution in open source Apache Spark to perform those operations 
using SQL primitives; many people we talked to just converting the data into 
RDD to work on the nested level of data, and then reconstruct the new 
dataframe. This is extremely inefficient because all the optimizations like 
predicate pushdown in SQL can not be performed, we can not leverage on the 
columnar format, and the serialization and deserialization cost becomes really 
huge even we just want to add a new column in the nested level.

We built a solution internally at Netflix which we're very happy with. We plan 
to make it open source in Spark upstream, and we would like to socialize the 
API design to see if we miss any use-case.  

The first API we added is *mapItems* on dataframe which take a function from 
*Column* to *Column*, and then apply the function on nested dataframe. Here is 
an example,
{code:java}
case class Data(foo: Int, bar: Double, items: Seq[Double])

val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
  Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
))

val result = df.mapItems("items") {
  item => item * 2.0
}

result.printSchema()
// root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// |    |-- element: double (containsNull = true)

result.show()
// +---+----+--------------------+
// |foo| bar|               items|
// +---+----+--------------------+
// | 10|10.0|[20.2, 20.4, 20.6...|
// | 20|20.0|[40.2, 40.4, 40.6...|
// +---+----+--------------------+
{code}

Now, with the ability of applying a function in the nested dataframe, we can 
add a new function, *withColumn* in *Column* to add or replace the existing 
column that has the same name in the nested list of struct. Here is two 
examples demonstrating the API together with *mapItems*; the first one replaces 
the existing column,
{code:java}
case class Item(a: Int, b: Double)

case class Data(foo: Int, bar: Double, items: Seq[Item])

val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "b")
}

result.printSchema
root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- a: integer (nullable = true)
// |    |    |-- b: double (nullable = true)

result.show(false)
// +---+----+----------------------+
// |foo|bar |items                 |
// +---+----+----------------------+
// |10 |10.0|[[10,11.0], [11,12.0]]|
// |20 |20.0|[[20,21.0], [21,22.0]]|
// +---+----+----------------------+
{code}
and the second one adds a new column in the nested dataframe.
{code:java}
val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "b")
}

result.printSchema
root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- a: integer (nullable = true)
// |    |    |-- b: double (nullable = true)
// |    |    |-- c: double (nullable = true)

result.show(false)
// +---+----+--------------------------------+
// |foo|bar |items                           |
// +---+----+--------------------------------+
// |10 |10.0|[[10,10.0,11.0], [11,11.0,12.0]]|
// |20 |20.0|[[20,20.0,21.0], [21,21.0,22.0]]|
// +---+----+--------------------------------+
{code}

We also implement a filter predicate to nested list of struct, and it will 
return those items which matched the predicate. The following is the API 
example,
{code:java}
val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.filterItems("items") {
  item => item("a") < 20
}

// +---+----+----------------------+
// |foo|bar |items                 |
// +---+----+----------------------+
// |10 |10.0|[[10,10.0], [11,11.0]]|
// |20 |20.0|[]                    |
// +---+----+----------------------+
{code}

Dropping a column in the nested list of struct can be achieved by similar API 
to *withColumn*. We add *drop* method to *Column* to implement this. Here is an 
example,
{code:java}
val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.mapItems("items") {
  item => item.drop("b")
}

result.printSchema
root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- a: integer (nullable = true)

result.show(false)
// +---+----+------------+
// |foo|bar |items       |
// +---+----+------------+
// |10 |10.0|[[10], [11]]|
// |20 |20.0|[[20], [21]]|
// +---+----+------------+
{code}

As you can see, those APIs are not opaque to Spark optimizers, and can fully 
take advantage of columnar data structure. 

We're looking forward to the community feedback and suggestion! Thanks.


> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-22231
>                 URL: https://issues.apache.org/jira/browse/SPARK-22231
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  |    |-- element: struct (containsNull = false)
>  |    |    |-- title_id: integer (nullable = true)
>  |    |    |-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream, and we would like to socialize 
> the API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |    |-- element: double (containsNull = true)
> result.show()
> // +---+----+--------------------+
> // |foo| bar|               items|
> // +---+----+--------------------+
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+----+--------------------+
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |    |-- element: struct (containsNull = true)
> // |    |    |-- a: integer (nullable = true)
> // |    |    |-- b: double (nullable = true)
> result.show(false)
> // +---+----+----------------------+
> // |foo|bar |items                 |
> // +---+----+----------------------+
> // |10 |10.0|[[10,11.0], [11,12.0]]|
> // |20 |20.0|[[20,21.0], [21,22.0]]|
> // +---+----+----------------------+
> {code}
> and the second one adds a new column in the nested dataframe.
> {code:java}
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |    |-- element: struct (containsNull = true)
> // |    |    |-- a: integer (nullable = true)
> // |    |    |-- b: double (nullable = true)
> // |    |    |-- c: double (nullable = true)
> result.show(false)
> // +---+----+--------------------------------+
> // |foo|bar |items                           |
> // +---+----+--------------------------------+
> // |10 |10.0|[[10,10.0,11.0], [11,11.0,12.0]]|
> // |20 |20.0|[[20,20.0,21.0], [21,21.0,22.0]]|
> // +---+----+--------------------------------+
> {code}
> We also implement a filter predicate to nested list of struct, and it will 
> return those items which matched the predicate. The following is the API 
> example,
> {code:java}
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.filterItems("items") {
>   item => item("a") < 20
> }
> // +---+----+----------------------+
> // |foo|bar |items                 |
> // +---+----+----------------------+
> // |10 |10.0|[[10,10.0], [11,11.0]]|
> // |20 |20.0|[]                    |
> // +---+----+----------------------+
> {code}
> Dropping a column in the nested list of struct can be achieved by similar API 
> to *withColumn*. We add *drop* method to *Column* to implement this. Here is 
> an example,
> {code:java}
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.drop("b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |    |-- element: struct (containsNull = true)
> // |    |    |-- a: integer (nullable = true)
> result.show(false)
> // +---+----+------------+
> // |foo|bar |items       |
> // +---+----+------------+
> // |10 |10.0|[[10], [11]]|
> // |20 |20.0|[[20], [21]]|
> // +---+----+------------+
> {code}
> As you can see, those APIs are not opaque to Spark optimizers, and can fully 
> take advantage of columnar data structure. 
> We're looking forward to the community feedback and suggestion! Thanks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to