This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new f5e6345e2 [Bug] fixed the ignore restored option bug (#1839)
f5e6345e2 is described below

commit f5e6345e28636ba87189dbe37441a6d9531bd9d4
Author: Gerry <[email protected]>
AuthorDate: Sun Oct 23 23:55:37 2022 +0800

    [Bug] fixed the ignore restored option bug (#1839)
    
    * [Bug] fixed the ignore restored option bug
---
 .../apache/streampark/flink/submit/bean/SubmitRequest.scala    | 10 ++--------
 .../streampark/flink/submit/trait/FlinkSubmitTrait.scala       |  4 +++-
 2 files changed, 5 insertions(+), 9 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index be1016285..a37147f70 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -25,7 +25,7 @@ import org.apache.streampark.common.enums._
 import org.apache.streampark.common.util.{DeflaterUtils, FlinkUtils, 
HdfsUtils, PropertiesUtils}
 import org.apache.streampark.flink.packer.pipeline.{BuildResult, 
ShadedBuildResponse}
 import org.apache.commons.io.FileUtils
-import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, 
SavepointRestoreSettings}
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
 
 import java.io.File
 import java.util.{Map => JavaMap}
@@ -74,13 +74,7 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
 
   lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
 
-  lazy val savepointRestoreSettings: SavepointRestoreSettings = {
-    lazy val allowNonRestoredState = 
Try(extraParameter.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
-    savePoint match {
-      case sp if Try(sp.isEmpty).getOrElse(true) => 
SavepointRestoreSettings.none
-      case sp => SavepointRestoreSettings.forPath(sp, allowNonRestoredState)
-    }
-  }
+  lazy val allowNonRestoredState = 
Try(extraParameter.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
 
   lazy val userJarFile: File = {
     executionMode match {
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index 3d61d8de8..1c7369f0c 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -104,6 +104,9 @@ trait FlinkSubmitTrait extends Logger {
     val retainedOption = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS
     flinkConfig.set(retainedOption, 
flinkDefaultConfiguration.get(retainedOption))
 
+    //set savepoint.ignore-unclaimed-state parameter
+    
flinkConfig.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, 
submitRequest.allowNonRestoredState)
+
     setConfig(submitRequest, flinkConfig)
 
     doSubmit(submitRequest, flinkConfig)
@@ -159,7 +162,6 @@ trait FlinkSubmitTrait extends Logger {
   private[submit] def getJobGraph(flinkConfig: Configuration, submitRequest: 
SubmitRequest, jarFile: File): (PackagedProgram, JobGraph) = {
     val packageProgram = PackagedProgram
       .newBuilder
-      .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
       .setJarFile(jarFile)
       
.setEntryPointClassName(flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
       .setArguments(

Reply via email to