[ 
https://issues.apache.org/jira/browse/FLINK-39673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Han Yin updated FLINK-39673:
----------------------------
    Description: 
h1. Background

Since Flink 1.20, the state recovery configuration keys were renamed:
||Old Key (deprecated)||New Key||
|execution.savepoint-restore-mode|execution.state-recovery.claim-mode|

The new keys are defined in `StateRecoveryOptions` with 
`withDeprecatedKeys(...)` pointing to the old keys:
{code:java}
// StateRecoveryOptions.java
public static final ConfigOption<RestoreMode> RESTORE_MODE =
    key("execution.state-recovery.claim-mode")
        .enumType(RestoreMode.class)
        .defaultValue(RestoreMode.DEFAULT)  // DEFAULT = NO_CLAIM
        .withDeprecatedKeys("execution.savepoint-restore-mode")
        ...{code}
Flink's `withDeprecatedKeys` mechanism defines the following resolution rules 
(see FLIP-406) for details:
 * If the user configures *only the new key* 
(`execution.state-recovery.claim-mode`), it takes effect.
 * If the user configures *only the old (deprecated) key* 
(`execution.savepoint-restore-mode`), it also takes effect — 
`withDeprecatedKeys` ensures backward compatibility by falling back to the old 
key.
 * If *both* keys are configured, the *new key takes precedence* over the 
deprecated one.

h1. Problem and Steps to Reproduce

Start a Flink standalone cluster.

Submit a SQL job restoring from a checkpoint. For example:
{code:java}
// SQL

SET 'execution.savepoint.path' = 'hdfs:///checkpoints/xxx/chk-xx';
SET 'execution.state-recovery.claim-mode' = 'CLAIM';

CREATE TABLE source_table (
    word STRING
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '5',
    'fields.word.length' = '10'
);

CREATE TABLE sink_table (
    word       STRING,
    word_count BIGINT
) WITH (
    'connector' = 'print'
);

INSERT INTO sink_table
SELECT word, COUNT(*) AS word_count
FROM source_table
GROUP BY word; {code}
 
The SQL above sets the claim mode using the {*}new key{*}. In this case, the 
job restores in *CLAIM* mode as expected.
 

Now change the 2nd `SET` statement to use the *deprecated key* instead:
{code:java}
SET 'execution.savepoint-restore-mode' = 'CLAIM';{code}
{*}Expected{*}: The job should still restore in *CLAIM* mode — per the 
`withDeprecatedKeys` contract, the old key should be honored when the new key 
is not explicitly set.

{*}Actual{*}: The job restores in *NO_CLAIM* mode — the default value — which 
directly conflicts with what the user explicitly specified via the SQL `SET` 
statement. The practical consequence is that Flink does *not* claim ownership 
of the restored checkpoint. As the job continues running and produces newer 
checkpoints that subsume the old ones, the original checkpoint provided above 
will *never be cleaned up* and will remain orphaned on HDFS indefinitely, 
leading to unnecessary storage consumption.
h1. Cause: User-configured value can be overridden by default value

When a SQL job is submitted through SQL Gateway, the following happens:
h2. Step 1 — CommandLine parsing creates `SavepointRestoreSettings` with 
defaults

`CliFrontendParser.createSavepointRestoreSettings()` is called. When the user 
does *not* provide  `-claimMode` or `-restoreMode` on the command line, the 
method falls into the `else` branch and uses the default value:
{code:java}
// CliFrontendParser.java
public static SavepointRestoreSettings 
createSavepointRestoreSettings(CommandLine commandLine) {
    if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
        ...
        if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
            restoreMode = ...;
        } else if (commandLine.hasOption(SAVEPOINT_RESTORE_MODE)) {
            restoreMode = ...;
        } else {
            restoreMode = StateRecoveryOptions.RESTORE_MODE.defaultValue(); // 
NO_CLAIM
        }
        return SavepointRestoreSettings.forPath(savepointPath, 
allowNonRestoredState, restoreMode);
    }
    ...
} {code}
Note the asymmetry: `SAVEPOINT_PATH` is conditionally set (only when non-null), 
but `RESTORE_MODE` and `SAVEPOINT_IGNORE_UNCLAIMED_STATE` are *always* set — 
even when the user never specified them on the command line.
h2. Step 2 — `toConfiguration()` unconditionally writes default values using 
the new key
{code:java}
// SavepointRestoreSettings.java
public static void toConfiguration(
        final SavepointRestoreSettings savepointRestoreSettings,
        final Configuration configuration) {
    configuration.set(
            StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,     // 
always set
            savepointRestoreSettings.allowNonRestoredState());
    configuration.set(
            StateRecoveryOptions.RESTORE_MODE,                         // 
always set
            savepointRestoreSettings.getRestoreMode());
    final String savepointPath = savepointRestoreSettings.getRestorePath();
    if (savepointPath != null) {
        configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath);
    }
} {code}
h2. Step 3 — User's SQL `SET` configuration is overridden

When the user configures the claim mode via SQL `SET` statements (or Flink 
configuration files), the value may be written using either the old or new key:
{code:java}
// SQL
-- Use the new key
SET 'execution.state-recovery.claim-mode' = 'CLAIM';
-- or the deprecated key:
SET 'execution.savepoint-restore-mode' = 'CLAIM'; {code}
However, since Step 2 already wrote the default value (`NO_CLAIM`) using the 
{*}new key{*}, the user's value specified with the old key gets overridden by 
the default value.
h2. Root Cause

`SavepointRestoreSettings.toConfiguration()` does not distinguish between "user 
explicitly set this value via command line" and "this is a default because the 
user didn't specify anything." It always writes both `RESTORE_MODE` and 
`SAVEPOINT_IGNORE_UNCLAIMED_STATE`, even when they should remain unset to allow 
downstream configuration (e.g., SQL `SET`, `flink-conf.yaml`) to take effect.
 
h1. Suggested Fix

Modify `SavepointRestoreSettings.toConfiguration()` to skip writing 
`RESTORE_MODE` and
`SAVEPOINT_IGNORE_UNCLAIMED_STATE` when they were not explicitly provided by 
the user.
 
 

  was:
h1. Backgroud

Since Flink 1.20, the state recovery configuration keys were renamed:

 
||Old Key (deprecated)||New Key||
|execution.savepoint-restore-mode|execution.state-recovery.claim-mode|
The new keys are defined in `StateRecoveryOptions` with 
`withDeprecatedKeys(...)` pointing to the old keys:
{code:java}
// StateRecoveryOptions.java
public static final ConfigOption<RestoreMode> RESTORE_MODE =
    key("execution.state-recovery.claim-mode")
        .enumType(RestoreMode.class)
        .defaultValue(RestoreMode.DEFAULT)  // DEFAULT = NO_CLAIM
        .withDeprecatedKeys("execution.savepoint-restore-mode")
        ...{code}
Flink's `withDeprecatedKeys` mechanism defines the following resolution rules 
(see [FLIP-406|https://issues.apache.org/jira/browse/FLINK-34255]) for details:
 * If the user configures *only the new key* 
(`execution.state-recovery.claim-mode`), it takes effect.
 * If the user configures *only the old (deprecated) key* 
(`execution.savepoint-restore-mode`), it also takes effect — 
`withDeprecatedKeys` ensures backward compatibility by falling back to the old 
key.
 *  If *both* keys are configured, the *new key takes precedence* over the 
deprecated one.
h1. Problem and Steps to Reproduce

Start a Flink standalone cluster.

Submit a SQL job restoring from a checkpoint. For example:

 
{code:java}
// SQL

SET 'execution.savepoint.path' = 'hdfs:///checkpoints/xxx/chk-xx';
SET 'execution.state-recovery.claim-mode' = 'CLAIM';

CREATE TABLE source_table (
    word STRING
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '5',
    'fields.word.length' = '10'
);

CREATE TABLE sink_table (
    word       STRING,
    word_count BIGINT
) WITH (
    'connector' = 'print'
);

INSERT INTO sink_table
SELECT word, COUNT(*) AS word_count
FROM source_table
GROUP BY word; {code}
 
The SQL above sets the claim mode using the {*}new key{*}. In this case, the 
job restores in *CLAIM* mode as expected.
 

Now change the 2nd `SET` statement to use the *deprecated key* instead:

 
{code:java}
SET 'execution.savepoint-restore-mode' = 'CLAIM';{code}
 
{*}Expected{*}: The job should still restore in *CLAIM* mode — per the 
`withDeprecatedKeys` contract, the old key should be honored when the new key 
is not explicitly set.

{*}Actual{*}: The job restores in *NO_CLAIM* mode — the default value — which 
directly conflicts with what the user explicitly specified via the SQL `SET` 
statement. The practical consequence is that Flink does *not* claim ownership 
of the restored checkpoint. As the job continues running and produces newer 
checkpoints that subsume the old ones, the original checkpoint provided above 
will *never be cleaned up* and will remain orphaned on HDFS indefinitely, 
leading to unnecessary storage consumption.
h1. Cause: User-configured value can be overridden by default value
When a SQL job is submitted through SQL Gateway, the following happens:
h2. Step 1 — CommandLine parsing creates `SavepointRestoreSettings` with 
defaults

`CliFrontendParser.createSavepointRestoreSettings()` is called. When the user 
does *not* provide `--claimMode` / `--restoreMode` on the command line, the 
method falls into the `else` branch and uses the default value:
{code:java}
// CliFrontendParser.java
public static SavepointRestoreSettings 
createSavepointRestoreSettings(CommandLine commandLine) {
    if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
        ...
        if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
            restoreMode = ...;
        } else if (commandLine.hasOption(SAVEPOINT_RESTORE_MODE)) {
            restoreMode = ...;
        } else {
            restoreMode = StateRecoveryOptions.RESTORE_MODE.defaultValue(); // 
NO_CLAIM
        }
        return SavepointRestoreSettings.forPath(savepointPath, 
allowNonRestoredState, restoreMode);
    }
    ...
} {code}
Note the asymmetry: `SAVEPOINT_PATH` is conditionally set (only when non-null), 
but `RESTORE_MODE` and `SAVEPOINT_IGNORE_UNCLAIMED_STATE` are *always* set — 
even when the user never specified them on the command line.
h2. Step 2 — `toConfiguration()` unconditionally writes default values using 
the new key
{code:java}
// SavepointRestoreSettings.java
public static void toConfiguration(
        final SavepointRestoreSettings savepointRestoreSettings,
        final Configuration configuration) {
    configuration.set(
            StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,     // 
always set
            savepointRestoreSettings.allowNonRestoredState());
    configuration.set(
            StateRecoveryOptions.RESTORE_MODE,                         // 
always set
            savepointRestoreSettings.getRestoreMode());
    final String savepointPath = savepointRestoreSettings.getRestorePath();
    if (savepointPath != null) {
        configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath);
    }
} {code}
h2. Step 3 — User's SQL `SET` configuration is overridden

When the user configures the claim mode via SQL `SET` statements (or Flink 
configuration files), the value may be written using either the old or new key:
{code:java}
// SQL
-- Use the new key
SET 'execution.state-recovery.claim-mode' = 'CLAIM';
-- or the deprecated key:
SET 'execution.savepoint-restore-mode' = 'CLAIM'; {code}
However, since Step 2 already wrote the default value (`NO_CLAIM`) using the 
{*}new key{*}, the user's value specified with the old key gets overridden by 
the default value.
h2. Root Cause
`SavepointRestoreSettings.toConfiguration()` does not distinguish between "user 
explicitly set this value via command line" and "this is a default because the 
user didn't specify anything." It always writes both `RESTORE_MODE` and 
`SAVEPOINT_IGNORE_UNCLAIMED_STATE`, even when they should remain unset to allow 
downstream configuration (e.g., SQL `SET`, `flink-conf.yaml`) to take effect.
 
h1. Suggested Fix
Modify `SavepointRestoreSettings.toConfiguration()` to skip writing 
`RESTORE_MODE` and
`SAVEPOINT_IGNORE_UNCLAIMED_STATE` when they were not explicitly provided by 
the user.
 
 


> SQL jobs may ignore user-configured claim mode with deprecated config key
> -------------------------------------------------------------------------
>
>                 Key: FLINK-39673
>                 URL: https://issues.apache.org/jira/browse/FLINK-39673
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.20.4
>            Reporter: Han Yin
>            Priority: Major
>
> h1. Background
> Since Flink 1.20, the state recovery configuration keys were renamed:
> ||Old Key (deprecated)||New Key||
> |execution.savepoint-restore-mode|execution.state-recovery.claim-mode|
> The new keys are defined in `StateRecoveryOptions` with 
> `withDeprecatedKeys(...)` pointing to the old keys:
> {code:java}
> // StateRecoveryOptions.java
> public static final ConfigOption<RestoreMode> RESTORE_MODE =
>     key("execution.state-recovery.claim-mode")
>         .enumType(RestoreMode.class)
>         .defaultValue(RestoreMode.DEFAULT)  // DEFAULT = NO_CLAIM
>         .withDeprecatedKeys("execution.savepoint-restore-mode")
>         ...{code}
> Flink's `withDeprecatedKeys` mechanism defines the following resolution rules 
> (see FLIP-406) for details:
>  * If the user configures *only the new key* 
> (`execution.state-recovery.claim-mode`), it takes effect.
>  * If the user configures *only the old (deprecated) key* 
> (`execution.savepoint-restore-mode`), it also takes effect — 
> `withDeprecatedKeys` ensures backward compatibility by falling back to the 
> old key.
>  * If *both* keys are configured, the *new key takes precedence* over the 
> deprecated one.
> h1. Problem and Steps to Reproduce
> Start a Flink standalone cluster.
> Submit a SQL job restoring from a checkpoint. For example:
> {code:java}
> // SQL
> SET 'execution.savepoint.path' = 'hdfs:///checkpoints/xxx/chk-xx';
> SET 'execution.state-recovery.claim-mode' = 'CLAIM';
> CREATE TABLE source_table (
>     word STRING
> ) WITH (
>     'connector' = 'datagen',
>     'rows-per-second' = '5',
>     'fields.word.length' = '10'
> );
> CREATE TABLE sink_table (
>     word       STRING,
>     word_count BIGINT
> ) WITH (
>     'connector' = 'print'
> );
> INSERT INTO sink_table
> SELECT word, COUNT(*) AS word_count
> FROM source_table
> GROUP BY word; {code}
>  
> The SQL above sets the claim mode using the {*}new key{*}. In this case, the 
> job restores in *CLAIM* mode as expected.
>  
> Now change the 2nd `SET` statement to use the *deprecated key* instead:
> {code:java}
> SET 'execution.savepoint-restore-mode' = 'CLAIM';{code}
> {*}Expected{*}: The job should still restore in *CLAIM* mode — per the 
> `withDeprecatedKeys` contract, the old key should be honored when the new key 
> is not explicitly set.
> {*}Actual{*}: The job restores in *NO_CLAIM* mode — the default value — which 
> directly conflicts with what the user explicitly specified via the SQL `SET` 
> statement. The practical consequence is that Flink does *not* claim ownership 
> of the restored checkpoint. As the job continues running and produces newer 
> checkpoints that subsume the old ones, the original checkpoint provided above 
> will *never be cleaned up* and will remain orphaned on HDFS indefinitely, 
> leading to unnecessary storage consumption.
> h1. Cause: User-configured value can be overridden by default value
> When a SQL job is submitted through SQL Gateway, the following happens:
> h2. Step 1 — CommandLine parsing creates `SavepointRestoreSettings` with 
> defaults
> `CliFrontendParser.createSavepointRestoreSettings()` is called. When the user 
> does *not* provide  `-claimMode` or `-restoreMode` on the command line, the 
> method falls into the `else` branch and uses the default value:
> {code:java}
> // CliFrontendParser.java
> public static SavepointRestoreSettings 
> createSavepointRestoreSettings(CommandLine commandLine) {
>     if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
>         ...
>         if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
>             restoreMode = ...;
>         } else if (commandLine.hasOption(SAVEPOINT_RESTORE_MODE)) {
>             restoreMode = ...;
>         } else {
>             restoreMode = StateRecoveryOptions.RESTORE_MODE.defaultValue(); 
> // NO_CLAIM
>         }
>         return SavepointRestoreSettings.forPath(savepointPath, 
> allowNonRestoredState, restoreMode);
>     }
>     ...
> } {code}
> Note the asymmetry: `SAVEPOINT_PATH` is conditionally set (only when 
> non-null), but `RESTORE_MODE` and `SAVEPOINT_IGNORE_UNCLAIMED_STATE` are 
> *always* set — even when the user never specified them on the command line.
> h2. Step 2 — `toConfiguration()` unconditionally writes default values using 
> the new key
> {code:java}
> // SavepointRestoreSettings.java
> public static void toConfiguration(
>         final SavepointRestoreSettings savepointRestoreSettings,
>         final Configuration configuration) {
>     configuration.set(
>             StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,     // 
> always set
>             savepointRestoreSettings.allowNonRestoredState());
>     configuration.set(
>             StateRecoveryOptions.RESTORE_MODE,                         // 
> always set
>             savepointRestoreSettings.getRestoreMode());
>     final String savepointPath = savepointRestoreSettings.getRestorePath();
>     if (savepointPath != null) {
>         configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath);
>     }
> } {code}
> h2. Step 3 — User's SQL `SET` configuration is overridden
> When the user configures the claim mode via SQL `SET` statements (or Flink 
> configuration files), the value may be written using either the old or new 
> key:
> {code:java}
> // SQL
> -- Use the new key
> SET 'execution.state-recovery.claim-mode' = 'CLAIM';
> -- or the deprecated key:
> SET 'execution.savepoint-restore-mode' = 'CLAIM'; {code}
> However, since Step 2 already wrote the default value (`NO_CLAIM`) using the 
> {*}new key{*}, the user's value specified with the old key gets overridden by 
> the default value.
> h2. Root Cause
> `SavepointRestoreSettings.toConfiguration()` does not distinguish between 
> "user explicitly set this value via command line" and "this is a default 
> because the user didn't specify anything." It always writes both 
> `RESTORE_MODE` and `SAVEPOINT_IGNORE_UNCLAIMED_STATE`, even when they should 
> remain unset to allow downstream configuration (e.g., SQL `SET`, 
> `flink-conf.yaml`) to take effect.
>  
> h1. Suggested Fix
> Modify `SavepointRestoreSettings.toConfiguration()` to skip writing 
> `RESTORE_MODE` and
> `SAVEPOINT_IGNORE_UNCLAIMED_STATE` when they were not explicitly provided by 
> the user.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to