[ 
https://issues.apache.org/jira/browse/SPARK-57133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikolina Vraneš updated SPARK-57133:
------------------------------------
    Description: 
h2. Summary

 

  Add a new SQL relation operator {{BIN BY (...)}} that aligns range-typed rows 
to fixed-width bin boundaries by splitting any row whose {{[range_start, 
range_end)}} crosses a boundary and proportionally redistributing user-selected 
values across the resulting sub-ranges. Target use case: telemetry and 
observability data where each row carries its own measurement window 
(OpenTelemetry, Prometheus exports).

  

  The operator occupies the same grammar position as {{PIVOT}} / {{UNPIVOT}} 
(post-{{{}FROM{}}} relation extension), composes  with {{GROUP BY}} / {{WHERE}} 
/ {{JOIN}} downstream, and produces an ordinary relation.

  
h2. Syntax

 

 
{code:sql}
  SELECT * FROM relation BIN BY (

    RANGE rangeStartCol TO rangeEndCol

    BIN WIDTH widthExpr

    [ALIGN TO originExpr]

    DISTRIBUTE UNIFORM (distributeCol [, distributeCol ...])

    [BIN_START AS aliasName]

    [BIN_END AS aliasName]

    [BIN_DISTRIBUTE_RATIO AS aliasName]

  ) [AS resultAlias];

  {code}
 

  {{BIN BY}} also works as a SQL pipe-operator stage on par with {{PIVOT}} / 
{{{}UNPIVOT{}}}:

 

 
{code:sql}
  FROM relation |> BIN BY (

    RANGE rangeStartCol TO rangeEndCol

    BIN WIDTH widthExpr

    DISTRIBUTE UNIFORM (distributeCol)

  );

  {code}
  
h2. Clauses

 
 * {{{}RANGE rangeStartCol TO rangeEndCol{}}}: two TIMESTAMP or TIMESTAMP_NTZ 
columns from the input relation defining each

  row's measurement window {{{}[range_start, range_end){}}}. Both columns must 
have the same type.
 * {{{}BIN WIDTH widthExpr{}}}: a day-time interval expression defining the bin 
size. Must be positive and foldable.
 * {{{}ALIGN TO originExpr{}}}: optional alignment anchor for the bin grid. 
Must be the same type as the range columns and

  must be foldable. When omitted, the default origin is {{{}1970-01-01 
00:00:00{}}}, session-zone-anchored

  for TIMESTAMP (LTZ) and wall-clock (epoch) for TIMESTAMP_NTZ.
 * {{{}DISTRIBUTE UNIFORM (col ...){}}}: one or more FLOAT or DOUBLE columns 
whose values are proportionally

  redistributed across sub-rows. Required; at least one column must be listed.
 * {{BIN_START AS}} / {{BIN_END AS}} / {{{}BIN_DISTRIBUTE_RATIO AS{}}}: 
optional rename clauses for the three appended output

   columns. When omitted, the default names ({{{}bin_start{}}}, 
{{{}bin_end{}}}, {{{}bin_distribute_ratio{}}}) are used.
 * {{{}AS resultAlias{}}}: optional table alias on the whole {{BIN BY}} 
relation (matches the {{PIVOT}} / {{UNPIVOT}}

  convention).

 
h2. Output schema

  

Input columns, plus three appended columns (default names shown; renameable via 
the optional {{AS}} clauses):
 * {{bin_start}} ({{{}TIMESTAMP{}}} or {{{}TIMESTAMP_NTZ{}}}, matches the input 
range column type)
 * {{bin_end}} (same type as {{{}bin_start{}}})
 * {{bin_distribute_ratio}} ({{{}DOUBLE{}}}): fraction of the original 
{{[range_start, range_end)}} duration that fell into this bin. {{1.0}} for 
unsplit rows.

 
h2. Behavior

 
 * For each input row, the operator emits one output row per bin that overlaps 
{{{}[range_start, range_end){}}}.
 * For each sub-row: the appended bin_start / bin_end hold the overlapping 
bin's grid boundaries; DISTRIBUTE UNIFORM columns are scaled by the fraction of 
the original range that falls inside the bin; all input columns, including the 
original range columns, are replicated unchanged. The input range columns are 
not truncated/clipped; the truncated sub-window is derivable as 
truncatedRangeStart = max(range_start, bin_start) and truncatedRangeEnd = 
min(range_end, bin_end).

 * bin_distribute_ratio reports the per-sub-row fraction (the truncated 
duration over the original range duration).

 * For TIMESTAMP, calendar-day components of DT interval bins are aligned to 
the session time zone. Sub-day bins stay on pure microsecond arithmetic 
regardless of zone.
 * TIMESTAMP_NTZ bin boundaries use UTC arithmetic.

 
h2. Examples

 

 
{code:sql}
  -- 5-minute alignment, single value column, default output names, default 
origin

  SELECT * FROM metrics BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 5 MINUTES

    DISTRIBUTE UNIFORM (value)

  );

 

  -- Custom origin, hourly bins, renamed boundary columns

  SELECT * FROM metrics BIN BY (

    RANGE ts_begin TO ts_end

    BIN WIDTH INTERVAL 1 HOUR

    ALIGN TO TIMESTAMP '2024-01-01 00:30:00'

    DISTRIBUTE UNIFORM (bytes_sent, requests)

    BIN_START AS window_start

    BIN_END AS window_end

  );

 

  -- Composed with GROUP BY for downstream aggregation

  SELECT bin_start, SUM(value) AS total

  FROM metrics BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 1 MINUTE

    DISTRIBUTE UNIFORM (value)

  )

  GROUP BY bin_start

  ORDER BY bin_start;

 

  -- Trailing alias on the relation result

  SELECT b.bin_start, SUM(b.value) AS total

  FROM metrics BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 1 MINUTE

    DISTRIBUTE UNIFORM (value)

  ) AS b

  GROUP BY b.bin_start;

 

  -- SQL pipe-operator form

  FROM metrics |> BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 1 MINUTE

    DISTRIBUTE UNIFORM (value)

  );

  {code}
  

  was:
h2. Summary

 

  Add a new SQL relation operator {{BIN BY (...)}} that aligns range-typed rows 
to fixed-width bin boundaries by splitting

   any row whose {{[range_start, range_end)}} crosses a boundary and 
proportionally redistributing user-selected values across the resulting 
sub-ranges. Target use case: telemetry and observability data where each row

  carries its own measurement window (OpenTelemetry, Prometheus exports).

  

  The operator occupies the same grammar position as {{PIVOT}} / {{UNPIVOT}} 
(post-{{{}FROM{}}} relation extension), composes

  with {{GROUP BY}} / {{WHERE}} / {{JOIN}} downstream, and produces an ordinary 
relation.

  
h2. Syntax

 

 
{code:sql}
  SELECT * FROM relation BIN BY (

    RANGE rangeStartCol TO rangeEndCol

    BIN WIDTH widthExpr

    [ALIGN TO originExpr]

    DISTRIBUTE UNIFORM (distributeCol [, distributeCol ...])

    [BIN_START AS aliasName]

    [BIN_END AS aliasName]

    [BIN_DISTRIBUTE_RATIO AS aliasName]

  ) [AS resultAlias];

  {code}
 

  {{BIN BY}} also works as a SQL pipe-operator stage on par with {{PIVOT}} / 
{{{}UNPIVOT{}}}:

 

 
{code:sql}
  FROM relation |> BIN BY (

    RANGE rangeStartCol TO rangeEndCol

    BIN WIDTH widthExpr

    DISTRIBUTE UNIFORM (distributeCol)

  );

  {code}
  
h2. Clauses

 
 * {{{}RANGE rangeStartCol TO rangeEndCol{}}}: two TIMESTAMP or TIMESTAMP_NTZ 
columns from the input relation defining each

  row's measurement window {{{}[range_start, range_end){}}}. Both columns must 
have the same type.
 * {{{}BIN WIDTH widthExpr{}}}: a day-time interval expression defining the bin 
size. Must be positive and foldable.
 * {{{}ALIGN TO originExpr{}}}: optional alignment anchor for the bin grid. 
Must be the same type as the range columns and

  must be foldable. When omitted, the default origin is {{{}1970-01-01 
00:00:00{}}}, session-zone-anchored

  for TIMESTAMP (LTZ) and wall-clock (epoch) for TIMESTAMP_NTZ.
 * {{{}DISTRIBUTE UNIFORM (col ...){}}}: one or more FLOAT or DOUBLE columns 
whose values are proportionally

  redistributed across sub-rows. Required; at least one column must be listed.
 * {{BIN_START AS}} / {{BIN_END AS}} / {{{}BIN_DISTRIBUTE_RATIO AS{}}}: 
optional rename clauses for the three appended output

   columns. When omitted, the default names ({{{}bin_start{}}}, 
{{{}bin_end{}}}, {{{}bin_distribute_ratio{}}}) are used.
 * {{{}AS resultAlias{}}}: optional table alias on the whole {{BIN BY}} 
relation (matches the {{PIVOT}} / {{UNPIVOT}}

  convention).

 
h2. Output schema

  

Input columns, plus three appended columns (default names shown; renameable via 
the optional {{AS}} clauses):
 * {{bin_start}} ({{{}TIMESTAMP{}}} or {{{}TIMESTAMP_NTZ{}}}, matches the input 
range column type)
 * {{bin_end}} (same type as {{{}bin_start{}}})
 * {{bin_distribute_ratio}} ({{{}DOUBLE{}}}): fraction of the original 
{{[range_start, range_end)}} duration that fell into this bin. {{1.0}} for 
unsplit rows.

 
h2. Behavior

 
 * For each input row, the operator emits one output row per bin that overlaps 
{{{}[range_start, range_end){}}}.
 * For each sub-row: the appended {{bin_start}} / {{bin_end}} hold the bin's 
boundaries; the input range columns are truncated to the sub-range (i.e., 
clipped to the bin's intersection with {{{}[range_start, range_end){}}}); 
DISTRIBUTE UNIFORM columns are scaled by the fraction of the original range 
that falls inside the bin; all other input columns are replicated unchanged.
 * {{bin_distribute_ratio}} reports the per-sub-row fraction.
 * For TIMESTAMP, calendar-day components of DT interval bins are aligned to 
the session time zone. Sub-day bins stay on pure microsecond arithmetic 
regardless of zone.
 * TIMESTAMP_NTZ bin boundaries use UTC arithmetic.

 
h2. Examples

 

 
{code:sql}
  -- 5-minute alignment, single value column, default output names, default 
origin

  SELECT * FROM metrics BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 5 MINUTES

    DISTRIBUTE UNIFORM (value)

  );

 

  -- Custom origin, hourly bins, renamed boundary columns

  SELECT * FROM metrics BIN BY (

    RANGE ts_begin TO ts_end

    BIN WIDTH INTERVAL 1 HOUR

    ALIGN TO TIMESTAMP '2024-01-01 00:30:00'

    DISTRIBUTE UNIFORM (bytes_sent, requests)

    BIN_START AS window_start

    BIN_END AS window_end

  );

 

  -- Composed with GROUP BY for downstream aggregation

  SELECT bin_start, SUM(value) AS total

  FROM metrics BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 1 MINUTE

    DISTRIBUTE UNIFORM (value)

  )

  GROUP BY bin_start

  ORDER BY bin_start;

 

  -- Trailing alias on the relation result

  SELECT b.bin_start, SUM(b.value) AS total

  FROM metrics BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 1 MINUTE

    DISTRIBUTE UNIFORM (value)

  ) AS b

  GROUP BY b.bin_start;

 

  -- SQL pipe-operator form

  FROM metrics |> BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 1 MINUTE

    DISTRIBUTE UNIFORM (value)

  );

  {code}
  


> Add BIN BY relation operator for aligning range-typed rows to fixed bin 
> boundaries
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-57133
>                 URL: https://issues.apache.org/jira/browse/SPARK-57133
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 5.0.0
>            Reporter: Nikolina Vraneš
>            Assignee: Nikolina Vraneš
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Summary
>  
>   Add a new SQL relation operator {{BIN BY (...)}} that aligns range-typed 
> rows to fixed-width bin boundaries by splitting any row whose {{[range_start, 
> range_end)}} crosses a boundary and proportionally redistributing 
> user-selected values across the resulting sub-ranges. Target use case: 
> telemetry and observability data where each row carries its own measurement 
> window (OpenTelemetry, Prometheus exports).
>   
>   The operator occupies the same grammar position as {{PIVOT}} / {{UNPIVOT}} 
> (post-{{{}FROM{}}} relation extension), composes  with {{GROUP BY}} / 
> {{WHERE}} / {{JOIN}} downstream, and produces an ordinary relation.
>   
> h2. Syntax
>  
>  
> {code:sql}
>   SELECT * FROM relation BIN BY (
>     RANGE rangeStartCol TO rangeEndCol
>     BIN WIDTH widthExpr
>     [ALIGN TO originExpr]
>     DISTRIBUTE UNIFORM (distributeCol [, distributeCol ...])
>     [BIN_START AS aliasName]
>     [BIN_END AS aliasName]
>     [BIN_DISTRIBUTE_RATIO AS aliasName]
>   ) [AS resultAlias];
>   {code}
>  
>   {{BIN BY}} also works as a SQL pipe-operator stage on par with {{PIVOT}} / 
> {{{}UNPIVOT{}}}:
>  
>  
> {code:sql}
>   FROM relation |> BIN BY (
>     RANGE rangeStartCol TO rangeEndCol
>     BIN WIDTH widthExpr
>     DISTRIBUTE UNIFORM (distributeCol)
>   );
>   {code}
>   
> h2. Clauses
>  
>  * {{{}RANGE rangeStartCol TO rangeEndCol{}}}: two TIMESTAMP or TIMESTAMP_NTZ 
> columns from the input relation defining each
>   row's measurement window {{{}[range_start, range_end){}}}. Both columns 
> must have the same type.
>  * {{{}BIN WIDTH widthExpr{}}}: a day-time interval expression defining the 
> bin size. Must be positive and foldable.
>  * {{{}ALIGN TO originExpr{}}}: optional alignment anchor for the bin grid. 
> Must be the same type as the range columns and
>   must be foldable. When omitted, the default origin is {{{}1970-01-01 
> 00:00:00{}}}, session-zone-anchored
>   for TIMESTAMP (LTZ) and wall-clock (epoch) for TIMESTAMP_NTZ.
>  * {{{}DISTRIBUTE UNIFORM (col ...){}}}: one or more FLOAT or DOUBLE columns 
> whose values are proportionally
>   redistributed across sub-rows. Required; at least one column must be listed.
>  * {{BIN_START AS}} / {{BIN_END AS}} / {{{}BIN_DISTRIBUTE_RATIO AS{}}}: 
> optional rename clauses for the three appended output
>    columns. When omitted, the default names ({{{}bin_start{}}}, 
> {{{}bin_end{}}}, {{{}bin_distribute_ratio{}}}) are used.
>  * {{{}AS resultAlias{}}}: optional table alias on the whole {{BIN BY}} 
> relation (matches the {{PIVOT}} / {{UNPIVOT}}
>   convention).
>  
> h2. Output schema
>   
> Input columns, plus three appended columns (default names shown; renameable 
> via the optional {{AS}} clauses):
>  * {{bin_start}} ({{{}TIMESTAMP{}}} or {{{}TIMESTAMP_NTZ{}}}, matches the 
> input range column type)
>  * {{bin_end}} (same type as {{{}bin_start{}}})
>  * {{bin_distribute_ratio}} ({{{}DOUBLE{}}}): fraction of the original 
> {{[range_start, range_end)}} duration that fell into this bin. {{1.0}} for 
> unsplit rows.
>  
> h2. Behavior
>  
>  * For each input row, the operator emits one output row per bin that 
> overlaps {{{}[range_start, range_end){}}}.
>  * For each sub-row: the appended bin_start / bin_end hold the overlapping 
> bin's grid boundaries; DISTRIBUTE UNIFORM columns are scaled by the fraction 
> of the original range that falls inside the bin; all input columns, including 
> the original range columns, are replicated unchanged. The input range columns 
> are not truncated/clipped; the truncated sub-window is derivable as 
> truncatedRangeStart = max(range_start, bin_start) and truncatedRangeEnd = 
> min(range_end, bin_end).
>  * bin_distribute_ratio reports the per-sub-row fraction (the truncated 
> duration over the original range duration).
>  * For TIMESTAMP, calendar-day components of DT interval bins are aligned to 
> the session time zone. Sub-day bins stay on pure microsecond arithmetic 
> regardless of zone.
>  * TIMESTAMP_NTZ bin boundaries use UTC arithmetic.
>  
> h2. Examples
>  
>  
> {code:sql}
>   -- 5-minute alignment, single value column, default output names, default 
> origin
>   SELECT * FROM metrics BIN BY (
>     RANGE time_start TO time_end
>     BIN WIDTH INTERVAL 5 MINUTES
>     DISTRIBUTE UNIFORM (value)
>   );
>  
>   -- Custom origin, hourly bins, renamed boundary columns
>   SELECT * FROM metrics BIN BY (
>     RANGE ts_begin TO ts_end
>     BIN WIDTH INTERVAL 1 HOUR
>     ALIGN TO TIMESTAMP '2024-01-01 00:30:00'
>     DISTRIBUTE UNIFORM (bytes_sent, requests)
>     BIN_START AS window_start
>     BIN_END AS window_end
>   );
>  
>   -- Composed with GROUP BY for downstream aggregation
>   SELECT bin_start, SUM(value) AS total
>   FROM metrics BIN BY (
>     RANGE time_start TO time_end
>     BIN WIDTH INTERVAL 1 MINUTE
>     DISTRIBUTE UNIFORM (value)
>   )
>   GROUP BY bin_start
>   ORDER BY bin_start;
>  
>   -- Trailing alias on the relation result
>   SELECT b.bin_start, SUM(b.value) AS total
>   FROM metrics BIN BY (
>     RANGE time_start TO time_end
>     BIN WIDTH INTERVAL 1 MINUTE
>     DISTRIBUTE UNIFORM (value)
>   ) AS b
>   GROUP BY b.bin_start;
>  
>   -- SQL pipe-operator form
>   FROM metrics |> BIN BY (
>     RANGE time_start TO time_end
>     BIN WIDTH INTERVAL 1 MINUTE
>     DISTRIBUTE UNIFORM (value)
>   );
>   {code}
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to