Han Yin created FLINK-39673:
-------------------------------
Summary: SQL jobs may ignore user-configured claim mode with
deprecated config key
Key: FLINK-39673
URL: https://issues.apache.org/jira/browse/FLINK-39673
Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing
Affects Versions: 1.20.4
Reporter: Han Yin
h1. Backgroud
Since Flink 1.20, the state recovery configuration keys were renamed:
||Old Key (deprecated)||New Key||
|execution.savepoint-restore-mode|execution.state-recovery.claim-mode|
The new keys are defined in `StateRecoveryOptions` with
`withDeprecatedKeys(...)` pointing to the old keys:
{code:java}
// StateRecoveryOptions.java
public static final ConfigOption<RestoreMode> RESTORE_MODE =
key("execution.state-recovery.claim-mode")
.enumType(RestoreMode.class)
.defaultValue(RestoreMode.DEFAULT) // DEFAULT = NO_CLAIM
.withDeprecatedKeys("execution.savepoint-restore-mode")
...{code}
Flink's `withDeprecatedKeys` mechanism defines the following resolution rules
(see [FLIP-406|https://issues.apache.org/jira/browse/FLINK-34255]) for details:
* If the user configures *only the new key*
(`execution.state-recovery.claim-mode`), it takes effect.
* If the user configures *only the old (deprecated) key*
(`execution.savepoint-restore-mode`), it also takes effect —
`withDeprecatedKeys` ensures backward compatibility by falling back to the old
key.
* If *both* keys are configured, the *new key takes precedence* over the
deprecated one.
h1. Problem and Steps to Reproduce
Start a Flink standalone cluster.
Submit a SQL job restoring from a checkpoint. For example:
{code:java}
// SQL
SET 'execution.savepoint.path' = 'hdfs:///checkpoints/xxx/chk-xx';
SET 'execution.state-recovery.claim-mode' = 'CLAIM';
CREATE TABLE source_table (
word STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5',
'fields.word.length' = '10'
);
CREATE TABLE sink_table (
word STRING,
word_count BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT word, COUNT(*) AS word_count
FROM source_table
GROUP BY word; {code}
The SQL above sets the claim mode using the {*}new key{*}. In this case, the
job restores in *CLAIM* mode as expected.
Now change the 2nd `SET` statement to use the *deprecated key* instead:
{code:java}
SET 'execution.savepoint-restore-mode' = 'CLAIM';{code}
{*}Expected{*}: The job should still restore in *CLAIM* mode — per the
`withDeprecatedKeys` contract, the old key should be honored when the new key
is not explicitly set.
{*}Actual{*}: The job restores in *NO_CLAIM* mode — the default value — which
directly conflicts with what the user explicitly specified via the SQL `SET`
statement. The practical consequence is that Flink does *not* claim ownership
of the restored checkpoint. As the job continues running and produces newer
checkpoints that subsume the old ones, the original checkpoint provided above
will *never be cleaned up* and will remain orphaned on HDFS indefinitely,
leading to unnecessary storage consumption.
h1. Cause: User-configured value can be overridden by default value
When a SQL job is submitted through SQL Gateway, the following happens:
h2. Step 1 — CommandLine parsing creates `SavepointRestoreSettings` with
defaults
`CliFrontendParser.createSavepointRestoreSettings()` is called. When the user
does *not* provide `--claimMode` / `--restoreMode` on the command line, the
method falls into the `else` branch and uses the default value:
{code:java}
// CliFrontendParser.java
public static SavepointRestoreSettings
createSavepointRestoreSettings(CommandLine commandLine) {
if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
...
if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
restoreMode = ...;
} else if (commandLine.hasOption(SAVEPOINT_RESTORE_MODE)) {
restoreMode = ...;
} else {
restoreMode = StateRecoveryOptions.RESTORE_MODE.defaultValue(); //
NO_CLAIM
}
return SavepointRestoreSettings.forPath(savepointPath,
allowNonRestoredState, restoreMode);
}
...
} {code}
Note the asymmetry: `SAVEPOINT_PATH` is conditionally set (only when non-null),
but `RESTORE_MODE` and `SAVEPOINT_IGNORE_UNCLAIMED_STATE` are *always* set —
even when the user never specified them on the command line.
h2. Step 2 — `toConfiguration()` unconditionally writes default values using
the new key
{code:java}
// SavepointRestoreSettings.java
public static void toConfiguration(
final SavepointRestoreSettings savepointRestoreSettings,
final Configuration configuration) {
configuration.set(
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, //
always set
savepointRestoreSettings.allowNonRestoredState());
configuration.set(
StateRecoveryOptions.RESTORE_MODE, //
always set
savepointRestoreSettings.getRestoreMode());
final String savepointPath = savepointRestoreSettings.getRestorePath();
if (savepointPath != null) {
configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath);
}
} {code}
h2. Step 3 — User's SQL `SET` configuration is overridden
When the user configures the claim mode via SQL `SET` statements (or Flink
configuration files), the value may be written using either the old or new key:
{code:java}
// SQL
-- Use the new key
SET 'execution.state-recovery.claim-mode' = 'CLAIM';
-- or the deprecated key:
SET 'execution.savepoint-restore-mode' = 'CLAIM'; {code}
However, since Step 2 already wrote the default value (`NO_CLAIM`) using the
{*}new key{*}, the user's value specified with the old key gets overridden by
the default value.
h2. Root Cause
`SavepointRestoreSettings.toConfiguration()` does not distinguish between "user
explicitly set this value via command line" and "this is a default because the
user didn't specify anything." It always writes both `RESTORE_MODE` and
`SAVEPOINT_IGNORE_UNCLAIMED_STATE`, even when they should remain unset to allow
downstream configuration (e.g., SQL `SET`, `flink-conf.yaml`) to take effect.
h1. Suggested Fix
Modify `SavepointRestoreSettings.toConfiguration()` to skip writing
`RESTORE_MODE` and
`SAVEPOINT_IGNORE_UNCLAIMED_STATE` when they were not explicitly provided by
the user.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)