lgtm4e opened a new issue, #3307: URL: https://github.com/apache/fluss/issues/3307
### Search before asking - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and found nothing similar. ### Fluss version 0.9.0 (latest release) ### Please describe the bug 🐞 ## Description When consuming a Fluss binlog table using Flink SQL, the Flink job fails during checkpoint/savepoint triggering with a `NullPointerException` in `FlinkSourceEnumerator.snapshotState`. The exception occurs consistently when executing checkpoint/savepoint operations (including stop-with-savepoint). It seems `leaseContext` is null when `snapshotState()` is invoked. --- ## Environment * Flink: 1.20.3 * Fluss: 0.9.0-incubating * Connector: `fluss-flink-1.20-0.9.0-incubating.jar` --- ## Source Table ```sql CREATE TABLE fluss_catalog.dwd_layer.dwd_cust_info_history ( change_type STRING, change_log_offset BIGINT NOT NULL, change_date STRING NOT NULL, change_commit_time TIMESTAMP(3), tenant_id STRING NOT NULL, cust_id BIGINT NOT NULL, sex INT, birthday DATE, age INT, org_id BIGINT, org_full_name STRING, cust_state INT, src_crtime TIMESTAMP(3), src_uptime TIMESTAMP(3), valid_from TIMESTAMP(3), etl_time TIMESTAMP(3), PRIMARY KEY (change_date, tenant_id, cust_id, change_log_offset) NOT ENFORCED ) PARTITIONED BY (change_date) WITH ( 'table.datalake.enabled' = 'true', 'table.datalake.freshness' = '1min', 'bucket.num' = '4' ); ``` --- ## Insert SQL ```sql INSERT INTO fluss_catalog.dwd_layer.dwd_cust_info_history SELECT _change_type AS change_type, _log_offset AS change_log_offset, DATE_FORMAT(_commit_timestamp, 'yyyyMMdd') AS change_date, CAST(_commit_timestamp AS TIMESTAMP(3)) AS change_commit_time, `after`.tenant_id, `after`.cust_id, `after`.sex, `after`.birthday, CASE WHEN `after`.birthday IS NOT NULL AND `after`.birthday <= CURRENT_DATE THEN CAST(TIMESTAMPDIFF(YEAR, `after`.birthday, CURRENT_DATE) AS INT) WHEN `after`.age IS NOT NULL AND `after`.age > 0 THEN `after`.age ELSE NULL END AS age, `after`.org_id, `after`.org_full_name, `after`.cust_state, `after`.crtime AS src_crtime, `after`.uptime AS src_uptime, COALESCE(`after`.uptime, `after`.crtime, CAST(_commit_timestamp AS TIMESTAMP(3))) AS valid_from, CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)) AS etl_time FROM fluss_catalog.ods_mysql.`ods_cust_info$binlog` /*+ OPTIONS('scan.startup.mode' = 'earliest') */ WHERE `after`.tenant_id IS NOT NULL AND `after`.cust_id IS NOT NULL AND `after`.cust_id <> -1 AND _log_offset IS NOT NULL AND _commit_timestamp IS NOT NULL AND _change_type IN ('insert', 'update'); ``` --- ## Error ```text [ERROR] Could not execute SQL statement. Reason: java.lang.NullPointerException: java.lang.NullPointerException: Cannot invoke "org.apache.fluss.flink.source.reader.LeaseContext.getKvSnapshotLeaseId()" because "this.leaseContext" is null ``` --- ## Full Stack Trace ```text 2026-05-13 02:36:03,796 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Triggering stop-with-savepoint for job 9894357cf8b86e0a2c5d78b335598c50. 2026-05-13 02:36:03,801 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 2 (type=SavepointType{name='Suspend Savepoint', postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1778639763798 for job 9894357cf8b86e0a2c5d78b335598c50. 2026-05-13 02:36:04,412 WARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 2 for job 9894357cf8b86e0a2c5d78b335598c50. (0 consecutive failed attempts so far) org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint failure. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$getCheckpointException$17(CheckpointCoordinator.java:2365) ~[flink-dist-1.20.3.jar:1.20.3] at java.base/java.util.Optional.orElseGet(Unknown Source) ~[?:?] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getCheckpointException(CheckpointCoordinator.java:2364) ~[flink-dist-1.20.3.jar:1.20.3] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:1056) ~[flink-dist-1.20.3.jar:1.20.3] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:1034) ~[flink-dist-1.20.3.jar:1.20.3] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:760) ~[flink-dist-1.20.3.jar:1.20.3] at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?] at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) ~[?:?] at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?] at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:70) ~[flink-dist-1.20.3.jar:1.20.3] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?] at java.base/java.util.concurrent.FutureTask.run(Unknown Source) [?:?] at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:?] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.base/java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.fluss.flink.source.reader.LeaseContext.getKvSnapshotLeaseId()" because "this.leaseContext" is null at org.apache.fluss.flink.source.enumerator.FlinkSourceEnumerator.snapshotState(FlinkSourceEnumerator.java:1031) ~[fluss-flink-1.20-0.9.0-incubating.jar:0.9.0-incubating] at org.apache.fluss.flink.source.enumerator.FlinkSourceEnumerator.snapshotState(FlinkSourceEnumerator.java:100) ~[fluss-flink-1.20-0.9.0-incubating.jar:0.9.0-incubating] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.toBytes(SourceCoordinator.java:577) ~[flink-dist-1.20.3.jar:1.20.3] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$checkpointCoordinator$7(SourceCoordinator.java:421) ~[flink-dist-1.20.3.jar:1.20.3] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:530) ~[flink-dist-1.20.3.jar:1.20.3] at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) ~[flink-dist-1.20.3.jar:1.20.3] ... 6 more 2026-05-13 02:36:04,414 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking checkpoint 2 as aborted for source Source: ods_cust_info$binlog[35]. ``` --- ## Additional Observations * The job itself runs normally before checkpoint/savepoint is triggered. * The failure happens specifically during checkpoint/savepoint snapshotting. * The source table is a Fluss binlog table: `ods_cust_info$binlog` * The issue appears related to `leaseContext` lifecycle/initialization inside `FlinkSourceEnumerator`. --- ## Questions 1. Is this a known issue in Fluss 0.9.0-incubating? 2. Is there any workaround or configuration to avoid this issue? 3. Has this issue already been fixed in a newer Fluss version? Thanks. ### Solution _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
