Liu created FLINK-39051:
---------------------------

             Summary: Support APPROX_COUNT_DISTINCT aggregate function in 
streaming mode with Window TVF
                 Key: FLINK-39051
                 URL: https://issues.apache.org/jira/browse/FLINK-39051
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: Liu


h1. Motivation

Currently, the APPROX_COUNT_DISTINCT aggregate function only supports batch 
mode (see BatchApproxCountDistinctAggFunctions.java). When users try to use 
this function in streaming SQL, they encounter the following error:
{code:java}
org.apache.flink.table.api.TableException: 
APPROX_COUNT_DISTINCT aggregate function does not support yet for streaming. 
{code}
This limitation prevents users from leveraging approximate distinct counting 
for real-time analytics use cases such as:
 * Real-time UV (Unique Visitor) counting in web analytics
 * Real-time user activity deduplication
 * Approximate cardinality estimation in streaming pipelines

Since Flink's HyperLogLog++ implementation already supports the merge() 
operation, it is technically feasible to extend APPROX_COUNT_DISTINCT to work 
with streaming Window TVF (TUMBLE, HOP, CUMULATE).
h1. Proposed Changes
 # Create a unified ApproxCountDistinctAggFunctions class that supports both 
batch and streaming modes:

 ** Implement the merge() method to enable Window TVF support (TUMBLE, HOP, 
CUMULATE)
 ** Implement the resetAccumulator() method for proper state management
 ** Support all existing data types: TINYINT, SMALLINT, INT, BIGINT, FLOAT, 
DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ, VARCHAR
 # Modify AggFunctionFactory.scala to:

 ** Remove the complete streaming mode restriction
 ** Only throw an exception when retraction is required (non-windowed streaming 
aggregation), with a clear error message guiding users to use Window TVF
 # Add comprehensive tests:

 ** Unit tests for all data types and the merge() method
 ** Integration tests for TUMBLE, HOP, and CUMULATE windows

h1. Scope and Limitations
 * *Supported:* Streaming mode with Window TVF (TUMBLE, HOP, CUMULATE)
**
 * *Not Supported:* Non-windowed streaming aggregation with retraction, because 
HyperLogLog is a probabilistic data structure that cannot remove elements once 
added

h1. Example Usage
{code:java}
-- TUMBLE window
SELECT
  window_start,
  window_end,
  APPROX_COUNT_DISTINCT(user_id) AS approx_uv
FROM TABLE(
  TUMBLE(TABLE events, DESCRIPTOR(event_time), INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end;-- CUMULATE window (requires merge() support)
SELECT
  window_start,
  window_end,
  APPROX_COUNT_DISTINCT(user_id) AS cumulative_uv
FROM TABLE(
  CUMULATE(TABLE events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE, INTERVAL 
'1' HOUR)
)
GROUP BY window_start, window_end; {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to