Hi Keith Lee,

Yes, a FLIP will be required.

Best regards,

Martijn

On Mon, May 13, 2024 at 2:51 PM Keith Lee <leekeiabstract...@gmail.com>
wrote:

> Thank you Martijn for confirming and switching the Jira to New Feature.
>
> I intend to explore approaches on implementing the feature to allow for:
>
> 1. Configurations that will make Flink SQL job restore robust to
> parallelism changes
> 2. Configurations that will allow best effort Flink SQL job restore after
> Flink statement changes.
>
> How best should such a feature request be driven? Will a FLIP be necessary?
>
> Best regards
> Keith Lee
>
>
> On Mon, May 13, 2024 at 1:27 PM Martijn Visser (Jira) <j...@apache.org>
> wrote:
>
> >
> >      [
> >
> https://issues.apache.org/jira/browse/FLINK-35336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
> > ]
> >
> > Martijn Visser updated FLINK-35336:
> > -----------------------------------
> >     Issue Type: New Feature  (was: Bug)
> >
> > > SQL failed to restore from savepoint after change in
> default-parallelism
> > >
> ------------------------------------------------------------------------
> > >
> > >                 Key: FLINK-35336
> > >                 URL: https://issues.apache.org/jira/browse/FLINK-35336
> > >             Project: Flink
> > >          Issue Type: New Feature
> > >          Components: Table SQL / Planner
> > >    Affects Versions: 1.18.1
> > >         Environment: Flink SQL Client, Flink 1.18.1 on MacOS
> > >            Reporter: Keith Lee
> > >            Priority: Major
> > >
> > > After bumping 'table.exec.resource.default-parallelism' from 1 to 4, I
> > am observing the following exception on restoring job from savepoint with
> > an unmodified statement set.
> > >
> > > {quote}[ERROR] Could not execute SQL statement. Reason:
> > > java.lang.IllegalStateException: Failed to rollback to
> > checkpoint/savepoint
> >
> [file:/tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff|file:///tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff].
> > Cannot map checkpoint/savepoint state for operator
> > 46ba9b22862c3bbe9373c6abee964b2a to the new program, because the operator
> > is not available in the new program. If you want to allow to skip this,
> you
> > can set the --allowNonRestoredState option on the CLI.
> > > {quote}
> > > When started without savepoints, the jobgraph differs for the jobs
> > despite identical statements being ran.
> > > There are 2 operators when default parallelism is 1.
> > > {quote}A: Source: UserBehaviourKafkaSource[68] -> (Calc[69] ->
> > StreamRecordTimestampInserter[70] -> StreamingFileWriter -> Sink: end,
> > Calc[71] -> LocalWindowAggregate[72])
> > > B: GlobalWindowAggregate[74] -> Calc[75] -> Sink:
> > CampaignAggregationsJDBC[76]
> > > {quote}
> > > Three operators when default parallelism is 4.
> > > {quote}A: Source: UserBehaviourKafkaSource[86] -> (Calc[87] ->
> > StreamRecordTimestampInserter[88] -> StreamingFileWriter, Calc[89] ->
> > LocalWindowAggregate[90])
> > > B: Sink: end
> > > C: GlobalWindowAggregate[92] -> Calc[93] -> Sink:
> > CampaignAggregationsJDBC[94]
> > > {quote}
> > >
> > > Notice that the operator 'Sink: end' is separated out when parallelism
> > is set to 4, causing the incompatibility in job graph. EXPLAIN PLAN did
> not
> > show any difference between syntax tree, physical plan or execution plan.
> > > I have attempted various configurations in `table.optimizer.*`.
> > > Steps to reproduce
> > > {quote}SET 'table.exec.resource.default-parallelism' = '1';
> > > EXECUTE STATEMENT SET BEGIN
> > >     INSERT INTO UserErrorExperienceS3Sink (user_id, user_session,
> > interaction_type, interaction_target, interaction_tags, event_date,
> > event_hour, event_time)
> > >     SELECT
> > >         user_id,
> > >         user_session,
> > >         interaction_type,
> > >         interaction_target,
> > >         interaction_tags,
> > >         DATE_FORMAT(event_time , 'yyyy-MM-dd'),
> > >         DATE_FORMAT(event_time , 'HH'),
> > >         event_time
> > >     FROM UserBehaviourKafkaSource
> > >     WHERE
> > >         interaction_result Like '%ERROR%';
> > >     INSERT INTO CampaignAggregationsJDBC
> > >     SELECT
> > >         CONCAT_WS('/', interaction_tags, interaction_result,
> > DATE_FORMAT(window_start, 'YYYY-MM-DD HH:mm:ss.SSS'),
> > DATE_FORMAT(window_end, 'YYYY-MM-DD HH:mm:ss.SSS')) AS id,
> > >         interaction_tags as campaign,
> > >         interaction_result,
> > >         COUNT(*) AS interaction_count,
> > >         window_start,
> > >         window_end
> > >     FROM
> > >         TABLE(TUMBLE(TABLE UserBehaviourKafkaSource,
> > DESCRIPTOR(event_time), INTERVAL '10' SECONDS))
> > >     GROUP BY window_start, window_end, interaction_tags,
> > interaction_result;
> > > END;
> > > STOP JOB '<JOB_ID>' WITH SAVEPOINT;
> > > SET 'execution.savepoint.path' = '/<SAVEPOINT_PATH>/';
> > > SET 'table.exec.resource.default-parallelism' = '4';
> > > <Re-run DML at line 2>
> > > {quote}
> > > DDLs
> > > {quote}– S3 Sink
> > > CREATE TABLE UserErrorExperienceS3Sink (
> > >   user_id BIGINT,
> > >   user_session STRING,
> > >   interaction_type STRING,
> > >   interaction_target STRING,
> > >   interaction_tags STRING,
> > >   event_date STRING,
> > >   event_hour STRING,
> > >   event_time TIMESTAMP(3) WITHOUT TIME ZONE)
> > > PARTITIONED BY (event_date, event_hour)
> > > WITH (
> > >   'connector' = 'filesystem',
> > >   'path' = 's3://<S3BUCKET>/userErrorExperience/',
> > >   'format' = 'json');
> > > – Kafka Source
> > > ADD JAR
> >
> 'file:///Users/leekei/Downloads/flink-sql-connector-kafka-3.1.0-1.18.jar';
> > > CREATE TABLE UserBehaviourKafkaSource (
> > >   user_id BIGINT,
> > >   user_session STRING,
> > >   interaction_type STRING,
> > >   interaction_target STRING,
> > >   interaction_tags STRING,
> > >   interaction_result STRING,
> > >   event_time TIMESTAMP(3) WITHOUT TIME ZONE METADATA FROM 'timestamp',
> > >   WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND)
> > > WITH (
> > >   'connector' = 'kafka',
> > >   'topic' = 'user_behaviour',
> > >   'properties.bootstrap.servers' = 'localhost:9092',
> > >   'properties.group.id' = 'demoGroup',
> > >   'scan.startup.mode' = 'latest-offset',
> > >   'format' = 'csv');
> > > – PostgreSQL Source/Sink
> > > ADD JAR
> > 'file:///Users/leekei/Downloads/flink-connector-jdbc-3.1.2-1.18.jar';
> > > ADD JAR 'file:///Users/leekei/Downloads/postgresql-42.7.3.jar';
> > > CREATE TABLE CampaignAggregationsJDBC (
> > >   id STRING,
> > >   campaign STRING,
> > >   interaction_result STRING,
> > >   interaction_count BIGINT,
> > >   window_start TIMESTAMP(3) WITHOUT TIME ZONE,
> > >   window_end TIMESTAMP(3) WITHOUT TIME ZONE)
> > > WITH (
> > >   'connector' = 'jdbc',
> > >   'url' = 'jdbc:postgresql://localhost:5432/postgres',
> > >   'table-name' = 'campaign_aggregations');
> > > {quote}
> >
> >
> >
> > --
> > This message was sent by Atlassian Jira
> > (v8.20.10#820010)
> >
>

Reply via email to