This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new f5e6345e2 [Bug] fixed the ignore restored option bug (#1839)
f5e6345e2 is described below
commit f5e6345e28636ba87189dbe37441a6d9531bd9d4
Author: Gerry <[email protected]>
AuthorDate: Sun Oct 23 23:55:37 2022 +0800
[Bug] fixed the ignore restored option bug (#1839)
* [Bug] fixed the ignore restored option bug
---
.../apache/streampark/flink/submit/bean/SubmitRequest.scala | 10 ++--------
.../streampark/flink/submit/trait/FlinkSubmitTrait.scala | 4 +++-
2 files changed, 5 insertions(+), 9 deletions(-)
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index be1016285..a37147f70 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -25,7 +25,7 @@ import org.apache.streampark.common.enums._
import org.apache.streampark.common.util.{DeflaterUtils, FlinkUtils,
HdfsUtils, PropertiesUtils}
import org.apache.streampark.flink.packer.pipeline.{BuildResult,
ShadedBuildResponse}
import org.apache.commons.io.FileUtils
-import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions,
SavepointRestoreSettings}
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
import java.io.File
import java.util.{Map => JavaMap}
@@ -74,13 +74,7 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
- lazy val savepointRestoreSettings: SavepointRestoreSettings = {
- lazy val allowNonRestoredState =
Try(extraParameter.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
- savePoint match {
- case sp if Try(sp.isEmpty).getOrElse(true) =>
SavepointRestoreSettings.none
- case sp => SavepointRestoreSettings.forPath(sp, allowNonRestoredState)
- }
- }
+ lazy val allowNonRestoredState =
Try(extraParameter.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
lazy val userJarFile: File = {
executionMode match {
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index 3d61d8de8..1c7369f0c 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -104,6 +104,9 @@ trait FlinkSubmitTrait extends Logger {
val retainedOption = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS
flinkConfig.set(retainedOption,
flinkDefaultConfiguration.get(retainedOption))
+ //set savepoint.ignore-unclaimed-state parameter
+
flinkConfig.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
submitRequest.allowNonRestoredState)
+
setConfig(submitRequest, flinkConfig)
doSubmit(submitRequest, flinkConfig)
@@ -159,7 +162,6 @@ trait FlinkSubmitTrait extends Logger {
private[submit] def getJobGraph(flinkConfig: Configuration, submitRequest:
SubmitRequest, jarFile: File): (PackagedProgram, JobGraph) = {
val packageProgram = PackagedProgram
.newBuilder
- .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
.setJarFile(jarFile)
.setEntryPointClassName(flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
.setArguments(