[
https://issues.apache.org/jira/browse/FLINK-39051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39051:
-----------------------------------
Labels: pull-request-available (was: )
> Support APPROX_COUNT_DISTINCT aggregate function in streaming mode with
> Window TVF
> ----------------------------------------------------------------------------------
>
> Key: FLINK-39051
> URL: https://issues.apache.org/jira/browse/FLINK-39051
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Reporter: Liu
> Priority: Major
> Labels: pull-request-available
>
> h1. Motivation
> Currently, the APPROX_COUNT_DISTINCT aggregate function only supports batch
> mode (see BatchApproxCountDistinctAggFunctions.java). When users try to use
> this function in streaming SQL, they encounter the following error:
> {code:java}
> org.apache.flink.table.api.TableException:
> APPROX_COUNT_DISTINCT aggregate function does not support yet for streaming.
> {code}
> This limitation prevents users from leveraging approximate distinct counting
> for real-time analytics use cases such as:
> * Real-time UV (Unique Visitor) counting in web analytics
> * Real-time user activity deduplication
> * Approximate cardinality estimation in streaming pipelines
> Since Flink's HyperLogLog++ implementation already supports the merge()
> operation, it is technically feasible to extend APPROX_COUNT_DISTINCT to work
> with streaming Window TVF (TUMBLE, HOP, CUMULATE).
> h1. Proposed Changes
> # Create a unified ApproxCountDistinctAggFunctions class that supports both
> batch and streaming modes:
> ** Implement the merge() method to enable Window TVF support (TUMBLE, HOP,
> CUMULATE)
> ** Implement the resetAccumulator() method for proper state management
> ** Support all existing data types: TINYINT, SMALLINT, INT, BIGINT, FLOAT,
> DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ, VARCHAR
> # Modify AggFunctionFactory.scala to:
> ** Remove the complete streaming mode restriction
> ** Only throw an exception when retraction is required (non-windowed
> streaming aggregation), with a clear error message guiding users to use
> Window TVF
> # Add comprehensive tests:
> ** Unit tests for all data types and the merge() method
> ** Integration tests for TUMBLE, HOP, and CUMULATE windows
> h1. Scope and Limitations
> * *Supported:* Streaming mode with Window TVF (TUMBLE, HOP, CUMULATE)
> **
> * *Not Supported:* Non-windowed streaming aggregation with retraction,
> because HyperLogLog is a probabilistic data structure that cannot remove
> elements once added
> h1. Example Usage
> {code:java}
> -- TUMBLE window
> SELECT
> window_start,
> window_end,
> APPROX_COUNT_DISTINCT(user_id) AS approx_uv
> FROM TABLE(
> TUMBLE(TABLE events, DESCRIPTOR(event_time), INTERVAL '1' HOUR)
> )
> GROUP BY window_start, window_end;-- CUMULATE window (requires merge()
> support)
> SELECT
> window_start,
> window_end,
> APPROX_COUNT_DISTINCT(user_id) AS cumulative_uv
> FROM TABLE(
> CUMULATE(TABLE events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE,
> INTERVAL '1' HOUR)
> )
> GROUP BY window_start, window_end; {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)