[ 
https://issues.apache.org/jira/browse/SPARK-14160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Burak Yavuz updated SPARK-14160:
--------------------------------
    Description: 
This JIRA is to track the status regarding event time windowing operations for 
Continuous queries.

The proposed API is as follows. **This API is currently a proposal, and may be 
subject to change.**

There are 3 parameters for the window :
 1. Time column. This will generally be the event time column for the record, 
but it should be possible to use ingestion time as well using an expression.
 2. The window length
 3. Slide interval (optional). The slide interval will create new windows with 
the window length provided in (2) at each interval. If the slide interval is 
not provided, we will generate tumbling windows.

Examples:
Consider the following schema for our data:
{code} sensor_id, measurement, timestamp {code}

In order to generate 30 second tumbling windows and averaging out the 
measurement values for each sensor, we may write something like:
{code}
df.window("timestamp", 30.seconds)
  .groupBy("sensor_id")
  .agg(mean("measurement").as("avg_meas"))
{code}

using the DataSet/DataFrame api.

To average 5 minute data every 1 minute (window length of 5 minutes, slide 
duration of 1 minute), we will use:
{code}
df.window("timestamp", 5.minutes, 1.minute)
  .groupBy("sensor_id")
  .agg(mean("measurement").as("avg_meas"))
{code}

In this example data points will be available in multiple windows. 

In order to use processing time for windowing, one may use:

{code}
df.withColumn("processing_time", current_timestamp())
  .window("timestamp", 5.minutes, 1.minute)
  .groupBy("sensor_id")
  .agg(mean("measurement").as("avg_meas"))
{code}

The resulting schema of the window operation will be:

{code} sensor_id, avg_meas, timestamp, window_range {code}

{code}window_range{code} will be a struct type of {code} window_start 
(DateTime), window_end (DateTime) {code} 


  was:
This JIRA is to track the status regarding event time windowing operations for 
Continuous queries.

The proposed API is as follows.

There are 3 parameters for the window :
 1. Time column. This will generally be the event time column for the record, 
but it should be possible to use ingestion time as well using an expression.
 2. The window length
 3. Slide interval (optional). The slide interval will create new windows with 
the window length provided in (2) at each interval. If the slide interval is 
not provided, we will generate tumbling windows.

Examples:
Consider the following schema for our data:
{code} sensor_id, measurement, timestamp {code}

In order to generate 30 second tumbling windows and averaging out the 
measurement values for each sensor, we may write something like:
{code}
df.window("timestamp", 30.seconds)
  .groupBy("sensor_id")
  .agg(mean("measurement").as("avg_meas"))
{code}

using the DataSet/DataFrame api.

To average 5 minute data every 1 minute (window length of 5 minutes, slide 
duration of 1 minute), we will use:
{code}
df.window("timestamp", 5.minutes, 1.minute)
  .groupBy("sensor_id")
  .agg(mean("measurement").as("avg_meas"))
{code}

This will mean that data points will be available in multiple windows. The 
resulting schema of the window operation will be:

{code} sensor_id, avg_meas, timestamp, window_range {code}

{code}window_range{code} will be a struct type of {code} window_start 
(DateTime), window_end (DateTime) {code} 

This API is currently a proposal, and may be subject to change.


> Windowing for structured streaming
> ----------------------------------
>
>                 Key: SPARK-14160
>                 URL: https://issues.apache.org/jira/browse/SPARK-14160
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Burak Yavuz
>
> This JIRA is to track the status regarding event time windowing operations 
> for Continuous queries.
> The proposed API is as follows. **This API is currently a proposal, and may 
> be subject to change.**
> There are 3 parameters for the window :
>  1. Time column. This will generally be the event time column for the record, 
> but it should be possible to use ingestion time as well using an expression.
>  2. The window length
>  3. Slide interval (optional). The slide interval will create new windows 
> with the window length provided in (2) at each interval. If the slide 
> interval is not provided, we will generate tumbling windows.
> Examples:
> Consider the following schema for our data:
> {code} sensor_id, measurement, timestamp {code}
> In order to generate 30 second tumbling windows and averaging out the 
> measurement values for each sensor, we may write something like:
> {code}
> df.window("timestamp", 30.seconds)
>   .groupBy("sensor_id")
>   .agg(mean("measurement").as("avg_meas"))
> {code}
> using the DataSet/DataFrame api.
> To average 5 minute data every 1 minute (window length of 5 minutes, slide 
> duration of 1 minute), we will use:
> {code}
> df.window("timestamp", 5.minutes, 1.minute)
>   .groupBy("sensor_id")
>   .agg(mean("measurement").as("avg_meas"))
> {code}
> In this example data points will be available in multiple windows. 
> In order to use processing time for windowing, one may use:
> {code}
> df.withColumn("processing_time", current_timestamp())
>   .window("timestamp", 5.minutes, 1.minute)
>   .groupBy("sensor_id")
>   .agg(mean("measurement").as("avg_meas"))
> {code}
> The resulting schema of the window operation will be:
> {code} sensor_id, avg_meas, timestamp, window_range {code}
> {code}window_range{code} will be a struct type of {code} window_start 
> (DateTime), window_end (DateTime) {code} 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to