Hi All, Apologies for cross-posting this, but I'm wondering if the dev list might be a better place for my questions below. For now, I'm developing set of utilities for my own use, but if I can get these utilities working, I'd like to see if it might be worth contributing them to the Spark project.
To summarize, I'm hoping to come up with a cleaner & more functional-programming oriented way of defining custom grouping calculations on Datasets / DataFrames, as described in my first email, from August 12. My second email (from Aug 13) provides a smaller and more self-contained example that I think illustrates the core stumbling block I'm running into. Thanks, ~ Andrew ---------- Forwarded message --------- From: Andrew Leverentz <andrew.levere...@empiricotx.com> Date: Tue, Aug 13, 2019 at 12:59 PM Subject: Re: Custom aggregations: modular and lightweight solutions? To: <u...@spark.apache.org> Here's a simpler example that I think gets at the heart of what I'm trying to do: DynamicSchemaExample.scala <https://gist.github.com/alev000/5be58bffb2dbc64bcdcc45fefb025a6e>. Here, I'm dynamically creating a sequence of Rows and also dynamically creating a corresponding schema (StructType), but the RowEncoder derived from the schema doesn't seem to handle the nested structure of the Rows. This example fails with a similar error (in this case, "scala.Tuple2$mcDD$sp is not a valid external type for schema of struct<_1:double,_2:double>"). If I could find a way to get this example working (for arbitrary values of rowSize), I suspect that it would also give me a solution to the custom-aggregation issue I outlined in my previous email. Any suggestions would be much appreciated. Thanks, ~ Andrew On Mon, Aug 12, 2019 at 5:31 PM Andrew Leverentz < andrew.levere...@empiricotx.com> wrote: > Hi All, > > I'm attempting to clean up some Spark code which performs groupByKey / > mapGroups to compute custom aggregations, and I could use some help > understanding the Spark API's necessary to make my code more modular and > maintainable. > > In particular, my current approach is as follows: > > - Start with a Dataset[CaseClass1] > - Apply groupByKey(f), where f is a function that extracts a tuple of > keys > - Apply mapGroups(g), where g computes multiple custom aggregations: > - Iterate through the rows in each group, updating a large, mutable > CustomState object. > - At the end of each group, transform the current key and the > CustomState into an instance of CaseClass2. > > In other words, we start with a dataset of CaseClass1 objects and end up > with a dataset of CaseClass2 objects, using instances of a complex > CustomState class to store the intermediate state during the aggregation. > > We have dozens of custom aggregation calculations to perform on this data, > and I'd like to be able streamline the process of introducing new > aggregations and comparing multiple parameterized variations of the same > aggregations side-by-side. The current approach requires us to touch > several tightly coupled pieces of code in order to add simple variations to > existing aggregate functions. > > The UDAF API seems to be designed for this use case, but I've found it to > be just as cumbersome to create new UDAF's as it is to maintain my current > code. > > To address this, I've tried a couple of approaches (described below), > although I've run into problems with both of them. > > At a high level, both of my approaches require a Dataset[T], a key > extractor function (T => K), and a collection of instances of a custom > class GroupingCalculation[T, S, R]. Here, T is the data type of each row > in the dataset, K is the type of the key by which the rows should be > grouped, S is the type of the intermediate state during aggregation, and R > is the result type of each aggregation. In this context, the data types T > and K are fixed, but the state and result types (S and R) may vary among > the GroupingCalculation instances. The resulting DataFrame will have Rows > which are basically concatenations of {K, R1, R2, ..., Rn}, where R1, ..., > Rn are the result types for the GroupingCollection instances. > > (1) My first approach operates by constructing a > UserDefinedAggregateFunction by applying ScalaReflection.schemaFor[...] to > T, S, and R. After some digging and experimentation, I found a way to use > CatalystTypeConverters and ExpressionEncoders to populate the > MutableAggregationBuffer. Unfortunately, once I finally got it running, > this approach yielded a runtime 10x slower than the original approach > described above. I suspect that adding an extra encoding/decoding layer on > top of the UDAF was what slowed it down. Because of this, I'm setting > aside this approach for now. > > (2) Using a similar API to (1), I replaced the implementation with one > that uses groupByKey and mapGroups. This bypasses the need for creating a > wrapper around UDAF. Also, the internal state, rather than being encoded > in a DataFrame, is simply stored in one mutable ArrayBuffer[Any] per > group. An implementation of this approach is available here: > https://gist.github.com/alev000/27d10a402ad250957b792091084932f4 > I feel that this implementation is promising, but I haven't been able to > get some of my test cases in the above Gist to pass. In particular, my > test cases "Test grouping calculations with various combinations of case > classes" and "Test firstAndOnly" fail with the following runtime error > messages, respectively: > > - "TestCase3 is not a valid external type for schema of > struct<a:int,b:double>" > - "scala.Some is not a valid external type for schema of string" > > Would anyone be able to help me diagnose the runtime errors with approach > (2), or to suggest a better alternative? > > Thanks, > ~ Andrew >