Hi Keith Lee, Yes, a FLIP will be required.
Best regards, Martijn On Mon, May 13, 2024 at 2:51 PM Keith Lee <leekeiabstract...@gmail.com> wrote: > Thank you Martijn for confirming and switching the Jira to New Feature. > > I intend to explore approaches on implementing the feature to allow for: > > 1. Configurations that will make Flink SQL job restore robust to > parallelism changes > 2. Configurations that will allow best effort Flink SQL job restore after > Flink statement changes. > > How best should such a feature request be driven? Will a FLIP be necessary? > > Best regards > Keith Lee > > > On Mon, May 13, 2024 at 1:27 PM Martijn Visser (Jira) <j...@apache.org> > wrote: > > > > > [ > > > https://issues.apache.org/jira/browse/FLINK-35336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel > > ] > > > > Martijn Visser updated FLINK-35336: > > ----------------------------------- > > Issue Type: New Feature (was: Bug) > > > > > SQL failed to restore from savepoint after change in > default-parallelism > > > > ------------------------------------------------------------------------ > > > > > > Key: FLINK-35336 > > > URL: https://issues.apache.org/jira/browse/FLINK-35336 > > > Project: Flink > > > Issue Type: New Feature > > > Components: Table SQL / Planner > > > Affects Versions: 1.18.1 > > > Environment: Flink SQL Client, Flink 1.18.1 on MacOS > > > Reporter: Keith Lee > > > Priority: Major > > > > > > After bumping 'table.exec.resource.default-parallelism' from 1 to 4, I > > am observing the following exception on restoring job from savepoint with > > an unmodified statement set. > > > > > > {quote}[ERROR] Could not execute SQL statement. Reason: > > > java.lang.IllegalStateException: Failed to rollback to > > checkpoint/savepoint > > > [file:/tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff|file:///tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff]. > > Cannot map checkpoint/savepoint state for operator > > 46ba9b22862c3bbe9373c6abee964b2a to the new program, because the operator > > is not available in the new program. If you want to allow to skip this, > you > > can set the --allowNonRestoredState option on the CLI. > > > {quote} > > > When started without savepoints, the jobgraph differs for the jobs > > despite identical statements being ran. > > > There are 2 operators when default parallelism is 1. > > > {quote}A: Source: UserBehaviourKafkaSource[68] -> (Calc[69] -> > > StreamRecordTimestampInserter[70] -> StreamingFileWriter -> Sink: end, > > Calc[71] -> LocalWindowAggregate[72]) > > > B: GlobalWindowAggregate[74] -> Calc[75] -> Sink: > > CampaignAggregationsJDBC[76] > > > {quote} > > > Three operators when default parallelism is 4. > > > {quote}A: Source: UserBehaviourKafkaSource[86] -> (Calc[87] -> > > StreamRecordTimestampInserter[88] -> StreamingFileWriter, Calc[89] -> > > LocalWindowAggregate[90]) > > > B: Sink: end > > > C: GlobalWindowAggregate[92] -> Calc[93] -> Sink: > > CampaignAggregationsJDBC[94] > > > {quote} > > > > > > Notice that the operator 'Sink: end' is separated out when parallelism > > is set to 4, causing the incompatibility in job graph. EXPLAIN PLAN did > not > > show any difference between syntax tree, physical plan or execution plan. > > > I have attempted various configurations in `table.optimizer.*`. > > > Steps to reproduce > > > {quote}SET 'table.exec.resource.default-parallelism' = '1'; > > > EXECUTE STATEMENT SET BEGIN > > > INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, > > interaction_type, interaction_target, interaction_tags, event_date, > > event_hour, event_time) > > > SELECT > > > user_id, > > > user_session, > > > interaction_type, > > > interaction_target, > > > interaction_tags, > > > DATE_FORMAT(event_time , 'yyyy-MM-dd'), > > > DATE_FORMAT(event_time , 'HH'), > > > event_time > > > FROM UserBehaviourKafkaSource > > > WHERE > > > interaction_result Like '%ERROR%'; > > > INSERT INTO CampaignAggregationsJDBC > > > SELECT > > > CONCAT_WS('/', interaction_tags, interaction_result, > > DATE_FORMAT(window_start, 'YYYY-MM-DD HH:mm:ss.SSS'), > > DATE_FORMAT(window_end, 'YYYY-MM-DD HH:mm:ss.SSS')) AS id, > > > interaction_tags as campaign, > > > interaction_result, > > > COUNT(*) AS interaction_count, > > > window_start, > > > window_end > > > FROM > > > TABLE(TUMBLE(TABLE UserBehaviourKafkaSource, > > DESCRIPTOR(event_time), INTERVAL '10' SECONDS)) > > > GROUP BY window_start, window_end, interaction_tags, > > interaction_result; > > > END; > > > STOP JOB '<JOB_ID>' WITH SAVEPOINT; > > > SET 'execution.savepoint.path' = '/<SAVEPOINT_PATH>/'; > > > SET 'table.exec.resource.default-parallelism' = '4'; > > > <Re-run DML at line 2> > > > {quote} > > > DDLs > > > {quote}– S3 Sink > > > CREATE TABLE UserErrorExperienceS3Sink ( > > > user_id BIGINT, > > > user_session STRING, > > > interaction_type STRING, > > > interaction_target STRING, > > > interaction_tags STRING, > > > event_date STRING, > > > event_hour STRING, > > > event_time TIMESTAMP(3) WITHOUT TIME ZONE) > > > PARTITIONED BY (event_date, event_hour) > > > WITH ( > > > 'connector' = 'filesystem', > > > 'path' = 's3://<S3BUCKET>/userErrorExperience/', > > > 'format' = 'json'); > > > – Kafka Source > > > ADD JAR > > > 'file:///Users/leekei/Downloads/flink-sql-connector-kafka-3.1.0-1.18.jar'; > > > CREATE TABLE UserBehaviourKafkaSource ( > > > user_id BIGINT, > > > user_session STRING, > > > interaction_type STRING, > > > interaction_target STRING, > > > interaction_tags STRING, > > > interaction_result STRING, > > > event_time TIMESTAMP(3) WITHOUT TIME ZONE METADATA FROM 'timestamp', > > > WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND) > > > WITH ( > > > 'connector' = 'kafka', > > > 'topic' = 'user_behaviour', > > > 'properties.bootstrap.servers' = 'localhost:9092', > > > 'properties.group.id' = 'demoGroup', > > > 'scan.startup.mode' = 'latest-offset', > > > 'format' = 'csv'); > > > – PostgreSQL Source/Sink > > > ADD JAR > > 'file:///Users/leekei/Downloads/flink-connector-jdbc-3.1.2-1.18.jar'; > > > ADD JAR 'file:///Users/leekei/Downloads/postgresql-42.7.3.jar'; > > > CREATE TABLE CampaignAggregationsJDBC ( > > > id STRING, > > > campaign STRING, > > > interaction_result STRING, > > > interaction_count BIGINT, > > > window_start TIMESTAMP(3) WITHOUT TIME ZONE, > > > window_end TIMESTAMP(3) WITHOUT TIME ZONE) > > > WITH ( > > > 'connector' = 'jdbc', > > > 'url' = 'jdbc:postgresql://localhost:5432/postgres', > > > 'table-name' = 'campaign_aggregations'); > > > {quote} > > > > > > > > -- > > This message was sent by Atlassian Jira > > (v8.20.10#820010) > > >