[ https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007116#comment-17007116 ]
fqaiser94 edited comment on SPARK-22231 at 1/2/20 11:25 PM: ------------------------------------------------------------ Hi folks, I can personally affirm that this would be a valuable feature to have in Spark. Looking around, its clear to me that other people have a need for this feature as well e.g. SPARK-16483, [Mailing list|http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Applying-transformation-on-a-struct-inside-an-array-td18934.html], etc. A few things have changed since the last comments in this discussion. Support for higher-order functions has been [added|https://databricks.com/blog/2018/11/16/introducing-new-built-in-functions-and-higher-order-functions-for-complex-data-types-in-apache-spark.html] to the Apache Spark project and in particular, there are now {{transform}} and {{filter}} functions available for operating on ArrayType columns. These would be the equivalent of the {{mapItems}} and {{filterItems}} functions that were previously proposed in this ticket. To complete this ticket then, I think all that is needed is adding the {{withField}}, {{withFieldRenamed}}, and {{drop}} methods to the Column class. Looking through the discussion, I can summarize the signatures for these new methods should be as follows: - {{def withField(fieldName: String, field: Column): Column}} Returns a new StructType column with field added/replaced based on name. - {{def drop(fieldNames: String*)}} Returns a new StructType column with field dropped. - {{def withFieldRenamed(existingName: String, newName: String): Column}} Returns a new StructType column with field renamed. Since it didn't seem like anybody was actively working on this, I went ahead and created a pull request to add a {{withField}} method to the {{Column}} class that conforms with the specs discussed in this ticket. You can review the PR here: [https://github.com/apache/spark/pull/27066] As this is my first PR to the Apache Spark project, I wanted to keep the PR small. However, I wouldn't mind writing the {{drop}} and {{withFieldRenamed}} methods as well in separate PRs once the current PR is accepted. was (Author: fqaiser94): Hi folks, I can personally affirm that this would be a valuable feature to have in Spark. Looking around, its clear to me that other people have a need for this feature as well e.g. SPARK-16483, [Mailing list|http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Applying-transformation-on-a-struct-inside-an-array-td18934.html], etc. A few things have changed since the last comments in this discussion. Support for higher-order functions has been [added|https://databricks.com/blog/2018/11/16/introducing-new-built-in-functions-and-higher-order-functions-for-complex-data-types-in-apache-spark.html] to the Apache Spark project and in particular, there are now {{transform}} and {{filter}} functions available for operating on ArrayType columns. These would be the equivalent of the {{mapItems}} and {{filterItems}} functions that were previously proposed in this ticket. To complete this ticket then, I think all that is needed is adding the {{withField}}, {{withFieldRenamed}}, and {{drop}} methods to the Column class. Looking through the discussion, I can summarize the signatures for these new methods should be as follows: - {{def withField(fieldName: String, field: Column): Column}} Returns a new StructType column with field added/replaced based on name. - {{def drop(colNames: String*)}} Returns a new StructType column with field dropped. - {{def withFieldRenamed(existingName: String, newName: String): Column}} Returns a new StructType column with field renamed. Since it didn't seem like anybody was actively working on this, I went ahead and created a pull request to add a {{withField}} method to the {{Column}} class that conforms with the specs discussed in this ticket. You can review the PR here: [https://github.com/apache/spark/pull/27066] As this is my first PR to the Apache Spark project, I wanted to keep the PR small. However, I wouldn't mind writing the {{drop}} and {{withFieldRenamed}} methods as well in separate PRs once the current PR is accepted. > Support of map, filter, withColumn, dropColumn in nested list of structures > --------------------------------------------------------------------------- > > Key: SPARK-22231 > URL: https://issues.apache.org/jira/browse/SPARK-22231 > Project: Spark > Issue Type: New Feature > Components: SQL > Affects Versions: 2.2.0 > Reporter: DB Tsai > Assignee: Jeremy Smith > Priority: Major > > At Netflix's algorithm team, we work on ranking problems to find the great > content to fulfill the unique tastes of our members. Before building a > recommendation algorithms, we need to prepare the training, testing, and > validation datasets in Apache Spark. Due to the nature of ranking problems, > we have a nested list of items to be ranked in one column, and the top level > is the contexts describing the setting for where a model is to be used (e.g. > profiles, country, time, device, etc.) Here is a blog post describing the > details, [Distributed Time Travel for Feature > Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907]. > > To be more concrete, for the ranks of videos for a given profile_id at a > given country, our data schema can be looked like this, > {code:java} > root > |-- profile_id: long (nullable = true) > |-- country_iso_code: string (nullable = true) > |-- items: array (nullable = false) > | |-- element: struct (containsNull = false) > | | |-- title_id: integer (nullable = true) > | | |-- scores: double (nullable = true) > ... > {code} > We oftentimes need to work on the nested list of structs by applying some > functions on them. Sometimes, we're dropping or adding new columns in the > nested list of structs. Currently, there is no easy solution in open source > Apache Spark to perform those operations using SQL primitives; many people > just convert the data into RDD to work on the nested level of data, and then > reconstruct the new dataframe as workaround. This is extremely inefficient > because all the optimizations like predicate pushdown in SQL can not be > performed, we can not leverage on the columnar format, and the serialization > and deserialization cost becomes really huge even we just want to add a new > column in the nested level. > We built a solution internally at Netflix which we're very happy with. We > plan to make it open source in Spark upstream. We would like to socialize the > API design to see if we miss any use-case. > The first API we added is *mapItems* on dataframe which take a function from > *Column* to *Column*, and then apply the function on nested dataframe. Here > is an example, > {code:java} > case class Data(foo: Int, bar: Double, items: Seq[Double]) > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)), > Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4)) > )) > val result = df.mapItems("items") { > item => item * 2.0 > } > result.printSchema() > // root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // | |-- element: double (containsNull = true) > result.show() > // +---+----+--------------------+ > // |foo| bar| items| > // +---+----+--------------------+ > // | 10|10.0|[20.2, 20.4, 20.6...| > // | 20|20.0|[40.2, 40.4, 40.6...| > // +---+----+--------------------+ > {code} > Now, with the ability of applying a function in the nested dataframe, we can > add a new function, *withColumn* in *Column* to add or replace the existing > column that has the same name in the nested list of struct. Here is two > examples demonstrating the API together with *mapItems*; the first one > replaces the existing column, > {code:java} > case class Item(a: Int, b: Double) > case class Data(foo: Int, bar: Double, items: Seq[Item]) > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))), > Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0))) > )) > val result = df.mapItems("items") { > item => item.withColumn(item("b") + 1 as "b") > } > result.printSchema > root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // | |-- element: struct (containsNull = true) > // | | |-- a: integer (nullable = true) > // | | |-- b: double (nullable = true) > result.show(false) > // +---+----+----------------------+ > // |foo|bar |items | > // +---+----+----------------------+ > // |10 |10.0|[[10,11.0], [11,12.0]]| > // |20 |20.0|[[20,21.0], [21,22.0]]| > // +---+----+----------------------+ > {code} > and the second one adds a new column in the nested dataframe. > {code:java} > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))), > Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0))) > )) > val result = df.mapItems("items") { > item => item.withColumn(item("b") + 1 as "c") > } > result.printSchema > root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // | |-- element: struct (containsNull = true) > // | | |-- a: integer (nullable = true) > // | | |-- b: double (nullable = true) > // | | |-- c: double (nullable = true) > result.show(false) > // +---+----+--------------------------------+ > // |foo|bar |items | > // +---+----+--------------------------------+ > // |10 |10.0|[[10,10.0,11.0], [11,11.0,12.0]]| > // |20 |20.0|[[20,20.0,21.0], [21,21.0,22.0]]| > // +---+----+--------------------------------+ > {code} > We also implement a filter predicate to nested list of struct, and it will > return those items which matched the predicate. The following is the API > example, > {code:java} > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))), > Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0))) > )) > val result = df.filterItems("items") { > item => item("a") < 20 > } > // +---+----+----------------------+ > // |foo|bar |items | > // +---+----+----------------------+ > // |10 |10.0|[[10,10.0], [11,11.0]]| > // |20 |20.0|[] | > // +---+----+----------------------+ > {code} > Dropping a column in the nested list of struct can be achieved by similar API > to *withColumn*. We add *drop* method to *Column* to implement this. Here is > an example, > {code:java} > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))), > Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0))) > )) > val result = df.mapItems("items") { > item => item.drop("b") > } > result.printSchema > root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // | |-- element: struct (containsNull = true) > // | | |-- a: integer (nullable = true) > result.show(false) > // +---+----+------------+ > // |foo|bar |items | > // +---+----+------------+ > // |10 |10.0|[[10], [11]]| > // |20 |20.0|[[20], [21]]| > // +---+----+------------+ > {code} > Note that all of those APIs are implemented by SQL expression with codegen; > as a result, those APIs are not opaque to Spark optimizers, and can fully > take advantage of columnar data structure. > We're looking forward to the community feedback and suggestion! Thanks. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org