lgtm4e opened a new issue, #3307:
URL: https://github.com/apache/fluss/issues/3307

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and 
found nothing similar.
   
   
   ### Fluss version
   
   0.9.0 (latest release)
   
   ### Please describe the bug 🐞
   
   ## Description
   
   When consuming a Fluss binlog table using Flink SQL, the Flink job fails 
during checkpoint/savepoint triggering with a `NullPointerException` in 
`FlinkSourceEnumerator.snapshotState`.
   
   The exception occurs consistently when executing checkpoint/savepoint 
operations (including stop-with-savepoint).
   
   It seems `leaseContext` is null when `snapshotState()` is invoked.
   
   ---
   
   ## Environment
   
   * Flink: 1.20.3
   * Fluss: 0.9.0-incubating
   * Connector: `fluss-flink-1.20-0.9.0-incubating.jar`
   
   ---
   
   ## Source Table
   
   ```sql
   CREATE TABLE fluss_catalog.dwd_layer.dwd_cust_info_history (
     change_type STRING,
     change_log_offset BIGINT NOT NULL,
     change_date STRING NOT NULL,
     change_commit_time TIMESTAMP(3),
     tenant_id STRING NOT NULL,
     cust_id BIGINT NOT NULL,
     sex INT,
     birthday DATE,
     age INT,
     org_id BIGINT,
     org_full_name STRING,
     cust_state INT,
     src_crtime TIMESTAMP(3),
     src_uptime TIMESTAMP(3),
     valid_from TIMESTAMP(3),
     etl_time TIMESTAMP(3),
     PRIMARY KEY (change_date, tenant_id, cust_id, change_log_offset) NOT 
ENFORCED
   ) PARTITIONED BY (change_date) WITH (
     'table.datalake.enabled' = 'true',
     'table.datalake.freshness' = '1min',
     'bucket.num' = '4'
   );
   ```
   
   ---
   
   ## Insert SQL
   
   ```sql
   INSERT INTO fluss_catalog.dwd_layer.dwd_cust_info_history
   SELECT
     _change_type AS change_type,
     _log_offset AS change_log_offset,
     DATE_FORMAT(_commit_timestamp, 'yyyyMMdd') AS change_date,
     CAST(_commit_timestamp AS TIMESTAMP(3)) AS change_commit_time,
     `after`.tenant_id,
     `after`.cust_id,
     `after`.sex,
     `after`.birthday,
     CASE
       WHEN `after`.birthday IS NOT NULL AND `after`.birthday <= CURRENT_DATE
         THEN CAST(TIMESTAMPDIFF(YEAR, `after`.birthday, CURRENT_DATE) AS INT)
       WHEN `after`.age IS NOT NULL AND `after`.age > 0 THEN `after`.age
       ELSE NULL
     END AS age,
     `after`.org_id,
     `after`.org_full_name,
     `after`.cust_state,
     `after`.crtime AS src_crtime,
     `after`.uptime AS src_uptime,
     COALESCE(`after`.uptime, `after`.crtime, CAST(_commit_timestamp AS 
TIMESTAMP(3))) AS valid_from,
     CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)) AS etl_time
   FROM fluss_catalog.ods_mysql.`ods_cust_info$binlog`
   /*+ OPTIONS('scan.startup.mode' = 'earliest') */
   WHERE `after`.tenant_id IS NOT NULL
     AND `after`.cust_id IS NOT NULL
     AND `after`.cust_id <> -1
     AND _log_offset IS NOT NULL
     AND _commit_timestamp IS NOT NULL
     AND _change_type IN ('insert', 'update');
   ```
   
   ---
   
   ## Error
   
   ```text
   [ERROR] Could not execute SQL statement. Reason:
   java.lang.NullPointerException: java.lang.NullPointerException: Cannot 
invoke 
"org.apache.fluss.flink.source.reader.LeaseContext.getKvSnapshotLeaseId()" 
because "this.leaseContext" is null
   ```
   
   ---
   
   ## Full Stack Trace
   
   ```text
   2026-05-13 02:36:03,796 INFO  org.apache.flink.runtime.jobmaster.JobMaster   
              [] - Triggering stop-with-savepoint for job 
9894357cf8b86e0a2c5d78b335598c50.
   2026-05-13 02:36:03,801 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 2 (type=SavepointType{name='Suspend Savepoint', 
postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1778639763798 for job 
9894357cf8b86e0a2c5d78b335598c50.
   2026-05-13 02:36:04,412 WARN  
org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to 
trigger or complete checkpoint 2 for job 9894357cf8b86e0a2c5d78b335598c50. (0 
consecutive failed attempts so far)
   org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint 
failure.
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$getCheckpointException$17(CheckpointCoordinator.java:2365)
 ~[flink-dist-1.20.3.jar:1.20.3]
        at java.base/java.util.Optional.orElseGet(Unknown Source) ~[?:?]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getCheckpointException(CheckpointCoordinator.java:2364)
 ~[flink-dist-1.20.3.jar:1.20.3]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:1056)
 ~[flink-dist-1.20.3.jar:1.20.3]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:1034)
 ~[flink-dist-1.20.3.jar:1.20.3]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:760)
 ~[flink-dist-1.20.3.jar:1.20.3]
        at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown 
Source) ~[?:?]
        at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
Source) ~[?:?]
        at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) 
~[?:?]
        at 
org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:70) 
~[flink-dist-1.20.3.jar:1.20.3]
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
[?:?]
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
        at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source) [?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source) [?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source) [?:?]
        at java.base/java.lang.Thread.run(Unknown Source) [?:?]
   Caused by: java.lang.NullPointerException: Cannot invoke 
"org.apache.fluss.flink.source.reader.LeaseContext.getKvSnapshotLeaseId()" 
because "this.leaseContext" is null
        at 
org.apache.fluss.flink.source.enumerator.FlinkSourceEnumerator.snapshotState(FlinkSourceEnumerator.java:1031)
 ~[fluss-flink-1.20-0.9.0-incubating.jar:0.9.0-incubating]
        at 
org.apache.fluss.flink.source.enumerator.FlinkSourceEnumerator.snapshotState(FlinkSourceEnumerator.java:100)
 ~[fluss-flink-1.20-0.9.0-incubating.jar:0.9.0-incubating]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.toBytes(SourceCoordinator.java:577)
 ~[flink-dist-1.20.3.jar:1.20.3]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$checkpointCoordinator$7(SourceCoordinator.java:421)
 ~[flink-dist-1.20.3.jar:1.20.3]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:530)
 ~[flink-dist-1.20.3.jar:1.20.3]
        at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
 ~[flink-dist-1.20.3.jar:1.20.3]
        ... 6 more
   2026-05-13 02:36:04,414 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
checkpoint 2 as aborted for source Source: ods_cust_info$binlog[35].
   ```
   
   ---
   
   ## Additional Observations
   
   * The job itself runs normally before checkpoint/savepoint is triggered.
   * The failure happens specifically during checkpoint/savepoint snapshotting.
   * The source table is a Fluss binlog table: `ods_cust_info$binlog`
   * The issue appears related to `leaseContext` lifecycle/initialization 
inside `FlinkSourceEnumerator`.
   
   ---
   
   ## Questions
   
   1. Is this a known issue in Fluss 0.9.0-incubating?
   2. Is there any workaround or configuration to avoid this issue?
   3. Has this issue already been fixed in a newer Fluss version?
   
   Thanks.
   
   
   ### Solution
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to