[ 
https://issues.apache.org/jira/browse/FLINK-35336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-35336:
-----------------------------------
    Issue Type: New Feature  (was: Bug)

> SQL failed to restore from savepoint after change in default-parallelism
> ------------------------------------------------------------------------
>
>                 Key: FLINK-35336
>                 URL: https://issues.apache.org/jira/browse/FLINK-35336
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / Planner
>    Affects Versions: 1.18.1
>         Environment: Flink SQL Client, Flink 1.18.1 on MacOS
>            Reporter: Keith Lee
>            Priority: Major
>
> After bumping 'table.exec.resource.default-parallelism' from 1 to 4, I am 
> observing the following exception on restoring job from savepoint with an 
> unmodified statement set. 
>  
> {quote}[ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint 
> [file:/tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff|file:///tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff].
>  Cannot map checkpoint/savepoint state for operator 
> 46ba9b22862c3bbe9373c6abee964b2a to the new program, because the operator is 
> not available in the new program. If you want to allow to skip this, you can 
> set the --allowNonRestoredState option on the CLI.
> {quote}
> When started without savepoints, the jobgraph differs for the jobs despite 
> identical statements being ran.
> There are 2 operators when default parallelism is 1.
> {quote}A: Source: UserBehaviourKafkaSource[68] -> (Calc[69] -> 
> StreamRecordTimestampInserter[70] -> StreamingFileWriter -> Sink: end, 
> Calc[71] -> LocalWindowAggregate[72])
> B: GlobalWindowAggregate[74] -> Calc[75] -> Sink: CampaignAggregationsJDBC[76]
> {quote}
> Three operators when default parallelism is 4.
> {quote}A: Source: UserBehaviourKafkaSource[86] -> (Calc[87] -> 
> StreamRecordTimestampInserter[88] -> StreamingFileWriter, Calc[89] -> 
> LocalWindowAggregate[90]) 
> B: Sink: end 
> C: GlobalWindowAggregate[92] -> Calc[93] -> Sink: CampaignAggregationsJDBC[94]
> {quote}
>  
> Notice that the operator 'Sink: end' is separated out when parallelism is set 
> to 4, causing the incompatibility in job graph. EXPLAIN PLAN did not show any 
> difference between syntax tree, physical plan or execution plan.
> I have attempted various configurations in `table.optimizer.*`.
> Steps to reproduce
> {quote}SET 'table.exec.resource.default-parallelism' = '1';
> EXECUTE STATEMENT SET BEGIN 
>     INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, 
> interaction_type, interaction_target, interaction_tags, event_date, 
> event_hour, event_time)
>     SELECT
>         user_id, 
>         user_session,
>         interaction_type,
>         interaction_target,
>         interaction_tags, 
>         DATE_FORMAT(event_time , 'yyyy-MM-dd'),
>         DATE_FORMAT(event_time , 'HH'),
>         event_time 
>     FROM UserBehaviourKafkaSource 
>     WHERE 
>         interaction_result Like '%ERROR%'; 
>     INSERT INTO CampaignAggregationsJDBC 
>     SELECT 
>         CONCAT_WS('/', interaction_tags, interaction_result, 
> DATE_FORMAT(window_start, 'YYYY-MM-DD HH:mm:ss.SSS'), DATE_FORMAT(window_end, 
> 'YYYY-MM-DD HH:mm:ss.SSS')) AS id, 
>         interaction_tags as campaign, 
>         interaction_result, 
>         COUNT(*) AS interaction_count, 
>         window_start, 
>         window_end 
>     FROM 
>         TABLE(TUMBLE(TABLE UserBehaviourKafkaSource, DESCRIPTOR(event_time), 
> INTERVAL '10' SECONDS)) 
>     GROUP BY window_start, window_end, interaction_tags, interaction_result; 
> END;
> STOP JOB '<JOB_ID>' WITH SAVEPOINT;
> SET 'execution.savepoint.path' = '/<SAVEPOINT_PATH>/';
> SET 'table.exec.resource.default-parallelism' = '4';
> <Re-run DML at line 2>
> {quote}
> DDLs
> {quote}– S3 Sink
> CREATE TABLE UserErrorExperienceS3Sink (
>   user_id BIGINT,
>   user_session STRING,
>   interaction_type STRING,
>   interaction_target STRING,
>   interaction_tags STRING,
>   event_date STRING,
>   event_hour STRING,
>   event_time TIMESTAMP(3) WITHOUT TIME ZONE)
> PARTITIONED BY (event_date, event_hour)
> WITH (
>   'connector' = 'filesystem',
>   'path' = 's3://<S3BUCKET>/userErrorExperience/',
>   'format' = 'json');
> – Kafka Source
> ADD JAR 
> 'file:///Users/leekei/Downloads/flink-sql-connector-kafka-3.1.0-1.18.jar';
> CREATE TABLE UserBehaviourKafkaSource (
>   user_id BIGINT,
>   user_session STRING,
>   interaction_type STRING,
>   interaction_target STRING,
>   interaction_tags STRING,
>   interaction_result STRING,
>   event_time TIMESTAMP(3) WITHOUT TIME ZONE METADATA FROM 'timestamp',
>   WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND)
> WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behaviour',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'demoGroup',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'csv');
> – PostgreSQL Source/Sink 
> ADD JAR 'file:///Users/leekei/Downloads/flink-connector-jdbc-3.1.2-1.18.jar';
> ADD JAR 'file:///Users/leekei/Downloads/postgresql-42.7.3.jar';
> CREATE TABLE CampaignAggregationsJDBC (
>   id STRING,
>   campaign STRING,
>   interaction_result STRING,
>   interaction_count BIGINT,
>   window_start TIMESTAMP(3) WITHOUT TIME ZONE,
>   window_end TIMESTAMP(3) WITHOUT TIME ZONE)
> WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:postgresql://localhost:5432/postgres',
>   'table-name' = 'campaign_aggregations');
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to