Liu created FLINK-39051:
---------------------------
Summary: Support APPROX_COUNT_DISTINCT aggregate function in
streaming mode with Window TVF
Key: FLINK-39051
URL: https://issues.apache.org/jira/browse/FLINK-39051
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: Liu
h1. Motivation
Currently, the APPROX_COUNT_DISTINCT aggregate function only supports batch
mode (see BatchApproxCountDistinctAggFunctions.java). When users try to use
this function in streaming SQL, they encounter the following error:
{code:java}
org.apache.flink.table.api.TableException:
APPROX_COUNT_DISTINCT aggregate function does not support yet for streaming.
{code}
This limitation prevents users from leveraging approximate distinct counting
for real-time analytics use cases such as:
* Real-time UV (Unique Visitor) counting in web analytics
* Real-time user activity deduplication
* Approximate cardinality estimation in streaming pipelines
Since Flink's HyperLogLog++ implementation already supports the merge()
operation, it is technically feasible to extend APPROX_COUNT_DISTINCT to work
with streaming Window TVF (TUMBLE, HOP, CUMULATE).
h1. Proposed Changes
# Create a unified ApproxCountDistinctAggFunctions class that supports both
batch and streaming modes:
** Implement the merge() method to enable Window TVF support (TUMBLE, HOP,
CUMULATE)
** Implement the resetAccumulator() method for proper state management
** Support all existing data types: TINYINT, SMALLINT, INT, BIGINT, FLOAT,
DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ, VARCHAR
# Modify AggFunctionFactory.scala to:
** Remove the complete streaming mode restriction
** Only throw an exception when retraction is required (non-windowed streaming
aggregation), with a clear error message guiding users to use Window TVF
# Add comprehensive tests:
** Unit tests for all data types and the merge() method
** Integration tests for TUMBLE, HOP, and CUMULATE windows
h1. Scope and Limitations
* *Supported:* Streaming mode with Window TVF (TUMBLE, HOP, CUMULATE)
**
* *Not Supported:* Non-windowed streaming aggregation with retraction, because
HyperLogLog is a probabilistic data structure that cannot remove elements once
added
h1. Example Usage
{code:java}
-- TUMBLE window
SELECT
window_start,
window_end,
APPROX_COUNT_DISTINCT(user_id) AS approx_uv
FROM TABLE(
TUMBLE(TABLE events, DESCRIPTOR(event_time), INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end;-- CUMULATE window (requires merge() support)
SELECT
window_start,
window_end,
APPROX_COUNT_DISTINCT(user_id) AS cumulative_uv
FROM TABLE(
CUMULATE(TABLE events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE, INTERVAL
'1' HOUR)
)
GROUP BY window_start, window_end; {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)