We have a stream of products, each with an ID, and each product has a price 
which may be updated.

We want a running count of the number of products over £30.

Schema: Product(productID: Int, price: Int)

To handle these updates, we currently have…

——

val products = session.readStream.schema(productSchema).csv(productDataPath)
val productsVersioned = products.groupBy(“productId").agg(first(“price”))
val productsOver30 = productsVersioned.filter(“price > 
30”).agg(count(“productId”))
productsOver30.writeStream
        .outputMode("complete")
        .format("console")
        .start()
        .awaitTermination()

——

However, the ‘productsOver30’ part introduces the second aggregation.

On 15 Dec 2016, 22:28 +0000, Michael Armbrust <mich...@databricks.com>, wrote:
> What is your use case?
>
> > On Thu, Dec 15, 2016 at 10:43 AM, ljwagerfield 
> > <lawre...@dmz.wagerfield.com> wrote:
> > > The current version of Spark (2.0.2) only supports one aggregation per
> > > structured stream (and will throw an exception if multiple aggregations 
> > > are
> > > applied).
> > >
> > > Roughly when will Spark support multiple aggregations?
> > >
> > >
> > >
> > > --
> > > View this message in context: 
> > > http://apache-spark-user-list.1001560.n3.nabble.com/When-will-multiple-aggregations-be-supported-in-Structured-Streaming-tp28219.html
> > > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> > >
> > > ---------------------------------------------------------------------
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
>

Reply via email to