tomma-a opened a new issue, #10193:
URL: https://github.com/apache/seatunnel/issues/10193

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   From my test :seatunnel 2.3.12 runs on flink engine 1.17 or 1.18 with flink 
upgradeMode: savepoint or last-state, when upgrading the flinkdeployment ( with 
flink k8s operator), I encounter following error:
   
   
   It seems like  in seatunnel FlinkSourceSplitEnumeratorContext    , we are 
using reflection to restore the seatunnel job , which is not yet compatiable 
with new versions of flink  (just my guess)
   
   
   
   
   
   
   
   2025-12-10 10:44:00,103 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No master 
state to restore 2025-12-10 10:44:00,104 INFO  
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator
 [] - Resetting coordinator to checkpoint. 2025-12-10 10:44:00,107 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing 
SourceCoordinator for source Source: Kafka-Source. 2025-12-10 10:44:00,108 INFO 
 org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
coordinator for source Source: Kafka-Source closed. 2025-12-10 10:44:00,111 
INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
Restoring SplitEnumerator of source Source: Kafka-Source from checkpoint. 
2025-12-10 10:44:00,221 WARN  
org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext 
[] - Get flink job id failed java.lang.IllegalStateException: Initialize flink 
job-id failed at org.apache.seatunnel.transla
 
tion.flink.source.FlinkSourceSplitEnumeratorContext.getJobIdForV15(FlinkSourceSplitEnumeratorContext.java:152)
 
~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12]
 at 
org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getFlinkJobId(FlinkSourceSplitEnumeratorContext.java:100)
 
~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12]
 at 
org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.<init>(FlinkSourceSplitEnumeratorContext.java:57)
 
~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12]
 at 
org.apache.seatunnel.translation.flink.source.FlinkSource.restoreEnumerator(FlinkSource.java:116)
 
~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12]
 at 
org.apache.seatunnel.translation.flink.source.FlinkSource.restoreEnumerator(FlinkSource.java:48)
 ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52
 bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:444)
 ~[flink-dist-1.17.2.jar:1.17.2] at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:406)
 ~[flink-dist-1.17.2.jar:1.17.2] at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$7(RecreateOnResetOperatorCoordinator.java:155)
 ~[flink-dist-1.17.2.jar:1.17.2] at 
java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] 
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Unknown Source) 
~[?:?] at java.util.concurrent.CompletableFuture.whenComplete(Unknown Source) 
~[?:?]
   at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156)
 ~[flink-dist-1.18.0.jar:1.18.0] at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122)
 ~[flink-dist-1.18.0.jar:1.18.0] at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379)
 ~[flink-dist-1.18.0.jar:1.18.0] at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356) 
~[flink-dist-1.18.0.jar:1.18.0] at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
 ~[flink-dist-1.18.0.jar:1.18.0] at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
 ~[flink-dist-1.18.0.jar:1.18.0] at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils
 .java:112) ~[flink-dist-1.18.0.jar:1.18.0] at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at 
java.lang.Thread.run(Unknown Source) [?:?] Caused by: 
java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "obj" 
is null at 
org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getJobIdForV15(FlinkSourceSplitEnumeratorContext.java:142)
 
~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-fd7fca630f268e360191f42ba11014b3:2.3.12]
   
   ### SeaTunnel Version
   
   seatunnel 2.3.12
   
   ### SeaTunnel Config
   
   ```conf
   here is my above test settings:
   
   seatunnel job setting, it's a streaming job
   
   `
   env {
   parallelism = 1
   job.mode = "STREAMING"
   checkpoint.interval=60000
   flink.execution.checkpointing.mode = "EXACTLY_ONCE"
   flink.execution.checkpointing.timeout = 600000
   }
   
   source {
   Kafka {
   plugin_output="fake2"
   topic = info
   consumer.group="testr"
   bootstrap.servers = 
"tom-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
   format = json
   }
   }
   
   sink {
   Kafka {
   plugin_input="fake2"
   topic = topc
   bootstrap.servers = 
"tom-cluster1-kafka-bootstrap.kafka.svc.cluster.local:9092"
   format = json
   kafka.request.timeout.ms = 60000
   semantics = EXACTLY_ONCE
   }
   }
   
   `
   
   Then I run the seatunnel job in flink (by flink k8s operator) CDR 
FlinkDeployment
   
   `
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
   name: seatunnel-flink-streaming-example-2
   namespace: kafka
   spec:
   ......
   
   volumes:
   - name: seatunnel-config
   configMap:
   name: seatunnel-config
   job:
   jarURI: local:///opt/seatunnel/starter/seatunnel-flink-15-starter.jar
   entryClass: org.apache.seatunnel.core.starter.flink.SeaTunnelFlink
   args: ["--config", "/data/seatunnel.streaming.conf"]
   parallelism: 2
   upgradeMode: savepoint
   
   `
   
   The first time , i kubectl apply the above yaml into a k8s cluster, the the 
seatunnel job is running as normal. the flink checkpoints saved periodically 
successfully.
   
   Then i make some changes the above yaml file , then apply the yaml again in 
k8s. It's a kind of uprading mode
   Because my flink upgradeMode is savepoint (if last-state also doesn't work 
which uses the last checkpoint)
   The above error happen, can NOT restore from last checkpoint/or savepoint!
   
   Please correct me if i'm wrong about this, thanks
   ```
   
   ### Running Command
   
   ```shell
   by applying a k8s flinkdeployemnt CDR
   ```
   
   ### Error Exception
   
   ```log
   2025-12-10 10:44:00,103 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No master 
state to restore 2025-12-10 10:44:00,104 INFO  
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator
 [] - Resetting coordinator to checkpoint. 2025-12-10 10:44:00,107 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing 
SourceCoordinator for source Source: Kafka-Source. 2025-12-10 10:44:00,108 INFO 
 org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
coordinator for source Source: Kafka-Source closed. 2025-12-10 10:44:00,111 
INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
Restoring SplitEnumerator of source Source: Kafka-Source from checkpoint. 
2025-12-10 10:44:00,221 WARN  
org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext 
[] - Get flink job id failed java.lang.IllegalStateException: Initialize flink 
job-id failed at org.apache.seatunnel.transla
 
tion.flink.source.FlinkSourceSplitEnumeratorContext.getJobIdForV15(FlinkSourceSplitEnumeratorContext.java:152)
 
~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12]
 at 
org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getFlinkJobId(FlinkSourceSplitEnumeratorContext.java:100)
 
~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12]
 at 
org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.<init>(FlinkSourceSplitEnumeratorContext.java:57)
 
~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12]
 at 
org.apache.seatunnel.translation.flink.source.FlinkSource.restoreEnumerator(FlinkSource.java:116)
 
~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12]
 at 
org.apache.seatunnel.translation.flink.source.FlinkSource.restoreEnumerator(FlinkSource.java:48)
 ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52
 bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:444)
 ~[flink-dist-1.17.2.jar:1.17.2] at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:406)
 ~[flink-dist-1.17.2.jar:1.17.2] at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$7(RecreateOnResetOperatorCoordinator.java:155)
 ~[flink-dist-1.17.2.jar:1.17.2] at 
java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] 
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Unknown Source) 
~[?:?] at java.util.concurrent.CompletableFuture.whenComplete(Unknown Source) 
~[?:?]
   at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156)
 ~[flink-dist-1.18.0.jar:1.18.0] at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122)
 ~[flink-dist-1.18.0.jar:1.18.0] at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379)
 ~[flink-dist-1.18.0.jar:1.18.0] at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356) 
~[flink-dist-1.18.0.jar:1.18.0] at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
 ~[flink-dist-1.18.0.jar:1.18.0] at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
 ~[flink-dist-1.18.0.jar:1.18.0] at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils
 .java:112) ~[flink-dist-1.18.0.jar:1.18.0] at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at 
java.lang.Thread.run(Unknown Source) [?:?] Caused by: 
java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "obj" 
is null at 
org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getJobIdForV15(FlinkSourceSplitEnumeratorContext.java:142)
 
~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-fd7fca630f268e360191f42ba11014b3:2.3.12]
   ```
   
   ### Zeta or Flink or Spark Version
   
   flink:   1.16 , 1.17 ,1.18
   
   I don't test other versions of flink.
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to