[ 
https://issues.apache.org/jira/browse/FLINK-39051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39051:
-----------------------------------
    Labels: pull-request-available  (was: )

> Support APPROX_COUNT_DISTINCT aggregate function in streaming mode with 
> Window TVF
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-39051
>                 URL: https://issues.apache.org/jira/browse/FLINK-39051
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: Liu
>            Priority: Major
>              Labels: pull-request-available
>
> h1. Motivation
> Currently, the APPROX_COUNT_DISTINCT aggregate function only supports batch 
> mode (see BatchApproxCountDistinctAggFunctions.java). When users try to use 
> this function in streaming SQL, they encounter the following error:
> {code:java}
> org.apache.flink.table.api.TableException: 
> APPROX_COUNT_DISTINCT aggregate function does not support yet for streaming. 
> {code}
> This limitation prevents users from leveraging approximate distinct counting 
> for real-time analytics use cases such as:
>  * Real-time UV (Unique Visitor) counting in web analytics
>  * Real-time user activity deduplication
>  * Approximate cardinality estimation in streaming pipelines
> Since Flink's HyperLogLog++ implementation already supports the merge() 
> operation, it is technically feasible to extend APPROX_COUNT_DISTINCT to work 
> with streaming Window TVF (TUMBLE, HOP, CUMULATE).
> h1. Proposed Changes
>  # Create a unified ApproxCountDistinctAggFunctions class that supports both 
> batch and streaming modes:
>  ** Implement the merge() method to enable Window TVF support (TUMBLE, HOP, 
> CUMULATE)
>  ** Implement the resetAccumulator() method for proper state management
>  ** Support all existing data types: TINYINT, SMALLINT, INT, BIGINT, FLOAT, 
> DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ, VARCHAR
>  # Modify AggFunctionFactory.scala to:
>  ** Remove the complete streaming mode restriction
>  ** Only throw an exception when retraction is required (non-windowed 
> streaming aggregation), with a clear error message guiding users to use 
> Window TVF
>  # Add comprehensive tests:
>  ** Unit tests for all data types and the merge() method
>  ** Integration tests for TUMBLE, HOP, and CUMULATE windows
> h1. Scope and Limitations
>  * *Supported:* Streaming mode with Window TVF (TUMBLE, HOP, CUMULATE)
> **
>  * *Not Supported:* Non-windowed streaming aggregation with retraction, 
> because HyperLogLog is a probabilistic data structure that cannot remove 
> elements once added
> h1. Example Usage
> {code:java}
> -- TUMBLE window
> SELECT
>   window_start,
>   window_end,
>   APPROX_COUNT_DISTINCT(user_id) AS approx_uv
> FROM TABLE(
>   TUMBLE(TABLE events, DESCRIPTOR(event_time), INTERVAL '1' HOUR)
> )
> GROUP BY window_start, window_end;-- CUMULATE window (requires merge() 
> support)
> SELECT
>   window_start,
>   window_end,
>   APPROX_COUNT_DISTINCT(user_id) AS cumulative_uv
> FROM TABLE(
>   CUMULATE(TABLE events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE, 
> INTERVAL '1' HOUR)
> )
> GROUP BY window_start, window_end; {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to