featzhang created FLINK-39062:
---------------------------------

             Summary: Support Watermark Definition in SQL Views
                 Key: FLINK-39062
                 URL: https://issues.apache.org/jira/browse/FLINK-39062
             Project: Flink
          Issue Type: New Feature
          Components: Table SQL / API, Table SQL / Planner
            Reporter: featzhang


h3. Summary

Flink SQL views currently do not support watermark definitions, which limits 
their effectiveness in event-time processing. Users cannot define watermarks on 
views, forcing them to expose underlying table structures and duplicate 
temporal logic across queries.

This issue proposes adding watermark support for views through two new SQL 
syntax options:
 # {{CREATE VIEW ... WATERMARK FOR ...}}
 # {{ALTER VIEW SET WATERMARK FOR ...}}

----
h3. Problem Statement

*Current Limitation:*

Views in Flink SQL are logical constructs that do not support watermark 
definitions. This creates several critical limitations:
 # ❌ *Broken Abstraction* - Users must reference underlying table watermarks 
directly
 # ❌ *No Strategy Flexibility* - Cannot define different watermark strategies 
for different downstream use cases
 # ❌ *Code Duplication* - Temporal logic must be repeated in every query
 # ❌ *Limited Architecture Support* - Incompatible with modern layered data 
architectures (Bronze/Silver/Gold)

*Example of Current Workaround:*
{code:sql}
-- Source table with watermark
CREATE TABLE raw_events (
    event_id BIGINT,
    event_time TIMESTAMP(3),
    data STRING,
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (...);

-- View without watermark (watermark inherited implicitly)
CREATE VIEW cleaned_events AS 
SELECT event_id, event_time FROM raw_events WHERE data IS NOT NULL;

-- Query must assume watermark from raw_events
SELECT TUMBLE_START(event_time, INTERVAL '1' HOUR), COUNT(*)
FROM cleaned_events
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR);
{code}
Problems:
 * View cannot define its own watermark strategy
 * Cannot create multiple views with different watermark strategies on same 
source
 * Watermark propagation is implicit and unclear

----
h3. Proposed Solution
h4. Syntax 1: CREATE VIEW with WATERMARK
{code:sql}
CREATE VIEW view_name
WATERMARK FOR time_column AS watermark_expression
AS SELECT ...;
{code}
*Example:*
{code:sql}
CREATE VIEW user_activity
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
AS SELECT user_id, event_time, action_type
FROM raw_events
WHERE action_type IN ('click', 'view', 'purchase');
{code}
h4. Syntax 2: ALTER VIEW SET WATERMARK
{code:sql}
ALTER VIEW view_name SET WATERMARK FOR time_column AS watermark_expression;
{code}
*Example:*
{code:sql}
-- Create view first
CREATE VIEW user_activity AS 
SELECT user_id, event_time, action_type FROM raw_events;

-- Add watermark later
ALTER VIEW user_activity 
SET WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND;
{code}
----
h3. Use Cases
h4. Use Case 1: Multi-Tenant Scenarios

Different tenants require different lateness tolerance:
{code:sql}
-- Source table (no watermark)
CREATE TABLE all_events (
    event_id BIGINT,
    event_time TIMESTAMP(3),
    tenant_id STRING,
    data STRING
) WITH ('connector' = 'kafka', ...);

-- Tenant A: Low-latency (5-second tolerance)
CREATE VIEW tenant_a_events
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
AS SELECT * FROM all_events WHERE tenant_id = 'tenant_a';

-- Tenant B: Batch-like (30-second tolerance)
CREATE VIEW tenant_b_events
WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
AS SELECT * FROM all_events WHERE tenant_id = 'tenant_b';
{code}
h4. Use Case 2: Data Lakehouse Architecture

Bronze/Silver/Gold layers with progressive watermark strategies:
{code:sql}
-- Bronze: Raw data (no watermark)
CREATE TABLE bronze_events (raw_data STRING, ingestion_time TIMESTAMP(3)) 
WITH (...);

-- Silver: Cleaned data with watermark
CREATE VIEW silver_events
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
AS SELECT 
    CAST(JSON_VALUE(raw_data, '$.event_time') AS TIMESTAMP(3)) AS event_time,
    JSON_VALUE(raw_data, '$.user_id') AS user_id
FROM bronze_events
WHERE JSON_VALUE(raw_data, '$.event_time') IS NOT NULL;

-- Gold: Business aggregations
CREATE VIEW gold_hourly_stats AS
SELECT 
    TUMBLE_START(event_time, INTERVAL '1' HOUR) AS hour,
    user_id,
    COUNT(*) AS event_count
FROM silver_events
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR), user_id;
{code}
h4. Use Case 3: Data Quality Monitoring

View for detecting late data:
{code:sql}
CREATE VIEW late_data_monitor
WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE
AS SELECT 
    data_source,
    event_time,
    processing_time,
    TIMESTAMPDIFF(SECOND, event_time, processing_time) AS latency_seconds
FROM all_events
WHERE TIMESTAMPDIFF(SECOND, event_time, processing_time) > 60;
{code}
----
h2. Implementation Plan
h3. Affected Components
h4. 1. Parser Layer ({{{}flink-sql-parser{}}})
 * Extend {{parserImpls.ftl}} to support WATERMARK clause in CREATE VIEW
 * Add {{SqlAlterViewSetWatermark}} AST node for ALTER VIEW syntax
 * Parse watermark expression as {{SqlNode}}

*Files to Modify:*
 * {{flink-sql-parser/src/main/codegen/includes/parserImpls.ftl}}
 * 
{{flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterViewSetWatermarkOperation.java}}
 (new)

h4. 2. Planner Layer ({{{}flink-table-planner{}}})
 * Implement {{SqlAlterViewSetWatermarkConverter}} to convert AST → Operation
 * Add validation logic in {{{}OperationConverterUtils{}}}:
 ** Watermark column exists in view schema
 ** Column type is TIMESTAMP or TIMESTAMP_LTZ
 ** Only one watermark per view
 ** Expression is valid SQL

*Files to Modify:*
 * 
{{flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlAlterViewSetWatermarkConverter.java}}
 (new)
 * 
{{flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala}}

h4. 3. Catalog Layer ({{{}flink-table-common{}}})
 * Store watermark metadata in {{CatalogView}} options:
{code:java}
Map<String, String> options = new HashMap<>();
options.put("watermark.column", "event_time");
options.put("watermark.strategy", "event_time - INTERVAL '5' SECOND");
{code}

*Files to Modify:*
 * 
{{flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogView.java}}

h4. 4. Execution Layer ({{{}flink-table-api-java{}}})
 * Modify {{TableEnvironmentImpl}} to apply watermark when resolving views
 * Implement watermark propagation during query planning

*Files to Modify:*
 * 
{{flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java}}

----
h2. Acceptance Criteria
h3. Functional Requirements
 * Users can create views with watermark using {{CREATE VIEW ... WATERMARK ...}}
 * Users can add/modify watermark using {{ALTER VIEW SET WATERMARK ...}}
 * Watermark column must exist in view's output schema (validated at creation 
time)
 * Watermark column must be TIMESTAMP or TIMESTAMP_LTZ type (validated at 
creation time)
 * Watermark expression is validated at view creation/alteration
 * Time window queries on views with watermark execute correctly
 * Watermark metadata persists in catalog across restarts
 * {{DESCRIBE VIEW}} shows watermark information

h3. Non-Functional Requirements
 * No performance regression on existing views without watermark
 * Backward compatible: old views work without modification
 * Watermark metadata works with all catalog implementations (GenericInMemory, 
Hive, JDBC)
 * Clear error messages for invalid watermark definitions

h3. Test Coverage
 * {*}Parser Tests{*}: SQL syntax parsing for CREATE VIEW and ALTER VIEW
 * {*}Converter Tests{*}: AST to Operation conversion
 * {*}Planner Tests{*}: Query planning with view watermarks
 * {*}Integration Tests{*}: E2E with SQL Client and Table API
 * {*}Negative Tests{*}: Invalid column, wrong type, duplicate watermark
 * {*}Catalog Tests{*}: Persistence across sessions
 * {*}Compatibility Tests{*}: Old views still work

----
h2. Test Cases
h3. Positive Test Cases
{code:sql}
-- TC-01: CREATE VIEW with watermark
CREATE VIEW test_view1
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
AS SELECT event_id, event_time FROM source_table;

-- TC-02: ALTER VIEW to add watermark
CREATE VIEW test_view2 AS SELECT event_id, event_time FROM source_table;
ALTER VIEW test_view2 SET WATERMARK FOR event_time AS event_time - INTERVAL 
'10' SECOND;

-- TC-03: Time window query on view
SELECT 
    TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
    COUNT(*) AS event_count
FROM test_view1
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR);

-- TC-04: Complex watermark expression
CREATE VIEW test_view3
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND - INTERVAL '500' 
MILLISECOND
AS SELECT * FROM source_table;

-- TC-05: DESCRIBE VIEW shows watermark
DESCRIBE test_view1;
-- Expected output includes: WATERMARK FOR event_time AS event_time - INTERVAL 
'5' SECOND
{code}
h3. Negative Test Cases
{code:sql}
-- TC-06: Watermark column doesn't exist (should fail)
CREATE VIEW test_error1
WATERMARK FOR non_existent_col AS non_existent_col - INTERVAL '5' SECOND
AS SELECT event_id FROM source_table;
-- Expected: "Column 'non_existent_col' not found in view schema"

-- TC-07: Watermark column is not TIMESTAMP type (should fail)
CREATE VIEW test_error2
WATERMARK FOR event_id AS event_id - INTERVAL '5' SECOND
AS SELECT event_id, event_time FROM source_table;
-- Expected: "Column 'event_id' must be of type TIMESTAMP or TIMESTAMP_LTZ"

-- TC-08: Invalid watermark expression (should fail)
CREATE VIEW test_error3
WATERMARK FOR event_time AS invalid_expression
AS SELECT event_id, event_time FROM source_table;
-- Expected: "Invalid watermark expression syntax"

-- TC-09: ALTER VIEW on non-existent view (should fail)
ALTER VIEW non_existent_view SET WATERMARK FOR event_time AS event_time - 
INTERVAL '5' SECOND;
-- Expected: "View 'non_existent_view' does not exist"

-- TC-10: Duplicate watermark definition (should fail)
CREATE VIEW test_error4
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
AS SELECT event_id, event_time FROM 
    (SELECT * FROM source_table WATERMARK FOR event_time AS event_time - 
INTERVAL '10' SECOND);
-- Expected: "View already has a watermark definition"
{code}
----
h2. Backward Compatibility
h3. No Breaking Changes

✅ *Fully backward compatible*
 * Existing views without watermark continue to work unchanged
 * Watermark is stored as optional metadata in view options
 * Older Flink versions ignore watermark options (graceful degradation)

h3. Migration Path

Users can adopt this feature gradually:
 # {*}Phase 1{*}: Continue using existing views without watermark
 # {*}Phase 2{*}: Create new views with watermark syntax
 # {*}Phase 3{*}: Use {{ALTER VIEW SET WATERMARK}} to add watermark to existing 
views incrementally

----
h2. Performance Impact
h3. Expected Impact
 * ✅ *No regression* on views without watermark
 * ✅ *Minimal overhead* for views with watermark (same as table watermark 
processing)
 * ✅ *Catalog overhead* negligible (storing 2 strings in options map)

h3. Benchmarking Plan
 * Measure query latency with and without view watermarks
 * Compare catalog read/write performance
 * Validate checkpoint size and recovery time

----
h2. Related Issues
 * FLINK-XXXXX: FLIP-296: Extend watermark-related features for SQL
 * FLINK-XXXXX: Support event time in SQL views (if exists)
 * FLINK-XXXXX: Improve watermark propagation in multi-source queries

----
h2. Documentation Requirements
h3. User Documentation
 * Update SQL {{CREATE VIEW}} documentation page
 * Add new {{ALTER VIEW SET WATERMARK}} documentation
 * Add examples to "Event Time and Watermarks" guide
 * Add troubleshooting section for common errors

h3. Developer Documentation
 * Architecture design document (FLIP or tech doc)
 * Implementation guide for contributors
 * JavaDoc/ScalaDoc for new classes and methods

h2. Open Questions
h3. Q1: Watermark Conflict in Joins

*Question:* What happens when a view with watermark is joined with a table with 
a different watermark?

*Proposed Answer:* Follow existing Flink watermark propagation rules:
 * For inner joins: Use the minimum watermark of both sides
 * For outer joins: Depends on join type (LEFT/RIGHT/FULL)

h3. Q2: Nested View Watermarks

*Question:* How to handle watermark when a view is built on another view with 
watermark?

*Proposed Answer:* Child view's watermark overrides parent's watermark (similar 
to how views override underlying table schemas).
h3. Q3: ALTER VIEW DROP WATERMARK

*Question:* Should we support removing watermark from views?

*Proposed Answer:* Defer to Phase 2 (future enhancement). Initial 
implementation focuses on ADD/UPDATE only.
----
h2. Alternative Approaches Considered
h3. Alternative 1: Query Hints
{code:sql}
SELECT /*+ WATERMARK('event_time', INTERVAL '5' SECOND) */ * FROM my_view;
{code}
❌ {*}Rejected{*}: Not reusable, must be specified in every query
h3. Alternative 2: Table Functions
{code:sql}
SELECT * FROM TABLE(with_watermark(my_view, 'event_time', INTERVAL '5' SECOND));
{code}
❌ {*}Rejected{*}: Verbose, unintuitive, doesn't leverage existing SQL syntax
h3. Alternative 3: Materialized Views with Watermark
{code:sql}
CREATE MATERIALIZED VIEW mv WATERMARK FOR ... AS SELECT ...;
{code}
❌ {*}Rejected{*}: Too heavyweight (requires physical storage), doesn't solve 
abstraction problem

*Conclusion:* View-level watermark definition is the cleanest approach.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to