[ 
https://issues.apache.org/jira/browse/FLINK-15545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-15545:
-----------------------------
    Description: 
Currently Flink DDL mixes three types of params all together:
 * External data’s metadata: defines what the data looks like (schema), where 
it is (location/url), how it should be accessed (username/pwd)
 * Source/sink runtime params: defines how and usually how fast Flink 
source/sink reads/writes data, not affecting the results
 ** Kafka “sink-partitioner”
 ** Elastic “bulk-flush.interval/max-size/...”
 * Semantics params: defines aspects like how much data Flink reads/writes, how 
the result will look like
 ** Kafka “startup-mode”, “offset”
 ** Watermark, timestamp column

Problems of the current mix-up: Flink cannot leverage catalogs and external 
system metadata alone to run queries with all the non-metadata params involved 
in DDL. E.g. when we add a catalog for Confluent Schema Registry, the expected 
user experience should be that Flink users just configure the catalog with url 
and usr/pwd, and should be able to run queries immediately; however, that’s not 
the case right now because users still have to use DDL to define a bunch params 
like “startup-mode”, “offset”, timestamp column, etc, along with the schema 
redundantly. We’ve heard many user complaints on this.

Solution:
 * The metadata defines external data, and are mostly already available in 
external systems, and Flink can leverage them via Flink Catalogs.
 * The latter two are Flink specific, having nothing to do with external 
system, are mostly different from query to query in Flink, e.g. users usually 
want to execute jobs by reading from different starting positions of a Kakfa 
topic, and different jobs may write to the same sink at various paces due to 
requirements of different latency. Thus they should not be part of Flink DDL.
 ** Runtime params can be in table hints (e.g. INSERT INTO x SELECT * FROM Y 
WITH(k1=v1, k2=v2)), or configured thru context/session params like “SET k1=v1; 
SET k2=v2; INSERT INTO x SELECT * FROM Y;”
 *** One problem with table hints is that it hinders the true unification of 
batch-streaming codebase for users. Flink claims to have unified 
batch-streaming, but it’s only on the stack/infra level, not on user codebase 
level. Users still face the legacy issue of lambda architecture - having to 
author a batch and a streaming jobs, and keep their business logic in sync. 
While what users really want is a single piece of business logic code that can 
run in both batch and streaming.   In this case, session params + dynamic 
catalog table (https://issues.apache.org/jira/browse/FLINK-15206) together can 
probably make a big progresss. Needs evaluation.
 ** Semantics params defining how much data to read should be filters in WHERE 
clause. E.g. SELECT * FROM X WHERE ‘start-position’ = ‘2020-01-01’ 
 watermark TBD

Result: Only rich catalogs for external systems + enhanced DML can provide 
Flink SQL users with a quick-enough start, out-of-the-box experience

> Separate runtime params and semantics params from Flink DDL for easier 
> integration with catalogs and better user experience
> ---------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-15545
>                 URL: https://issues.apache.org/jira/browse/FLINK-15545
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>            Priority: Major
>
> Currently Flink DDL mixes three types of params all together:
>  * External data’s metadata: defines what the data looks like (schema), where 
> it is (location/url), how it should be accessed (username/pwd)
>  * Source/sink runtime params: defines how and usually how fast Flink 
> source/sink reads/writes data, not affecting the results
>  ** Kafka “sink-partitioner”
>  ** Elastic “bulk-flush.interval/max-size/...”
>  * Semantics params: defines aspects like how much data Flink reads/writes, 
> how the result will look like
>  ** Kafka “startup-mode”, “offset”
>  ** Watermark, timestamp column
> Problems of the current mix-up: Flink cannot leverage catalogs and external 
> system metadata alone to run queries with all the non-metadata params 
> involved in DDL. E.g. when we add a catalog for Confluent Schema Registry, 
> the expected user experience should be that Flink users just configure the 
> catalog with url and usr/pwd, and should be able to run queries immediately; 
> however, that’s not the case right now because users still have to use DDL to 
> define a bunch params like “startup-mode”, “offset”, timestamp column, etc, 
> along with the schema redundantly. We’ve heard many user complaints on this.
> Solution:
>  * The metadata defines external data, and are mostly already available in 
> external systems, and Flink can leverage them via Flink Catalogs.
>  * The latter two are Flink specific, having nothing to do with external 
> system, are mostly different from query to query in Flink, e.g. users usually 
> want to execute jobs by reading from different starting positions of a Kakfa 
> topic, and different jobs may write to the same sink at various paces due to 
> requirements of different latency. Thus they should not be part of Flink DDL.
>  ** Runtime params can be in table hints (e.g. INSERT INTO x SELECT * FROM Y 
> WITH(k1=v1, k2=v2)), or configured thru context/session params like “SET 
> k1=v1; SET k2=v2; INSERT INTO x SELECT * FROM Y;”
>  *** One problem with table hints is that it hinders the true unification of 
> batch-streaming codebase for users. Flink claims to have unified 
> batch-streaming, but it’s only on the stack/infra level, not on user codebase 
> level. Users still face the legacy issue of lambda architecture - having to 
> author a batch and a streaming jobs, and keep their business logic in sync. 
> While what users really want is a single piece of business logic code that 
> can run in both batch and streaming.   In this case, session params + dynamic 
> catalog table (https://issues.apache.org/jira/browse/FLINK-15206) together 
> can probably make a big progresss. Needs evaluation.
>  ** Semantics params defining how much data to read should be filters in 
> WHERE clause. E.g. SELECT * FROM X WHERE ‘start-position’ = ‘2020-01-01’ 
>  watermark TBD
> Result: Only rich catalogs for external systems + enhanced DML can provide 
> Flink SQL users with a quick-enough start, out-of-the-box experience



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to