This is an automated email from the ASF dual-hosted git repository.

lvshaokang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new b5d70605c [Bug] savepoint timeout bug fixed. (#2652)
b5d70605c is described below

commit b5d70605c8c24a32ed417663f7bbdf76e7f634e6
Author: benjobs <[email protected]>
AuthorDate: Wed Apr 19 11:02:02 2023 +0800

    [Bug] savepoint timeout bug fixed. (#2652)
    
    * savepoint timeout bug fixed.
    * remove unused package
---
 .../core/service/impl/AppBuildPipeServiceImpl.java     | 14 ++++++++++----
 .../core/service/impl/ApplicationServiceImpl.java      |  1 -
 .../src/api/flink/app/app.type.ts                      |  1 -
 .../app/components/AppView/StopApplicationModal.vue    | 18 +-----------------
 .../streampark/flink/client/bean/CancelRequest.scala   |  1 -
 .../flink/client/trait/FlinkClientTrait.scala          | 10 ++--------
 .../client/trait/KubernetesNativeClientTrait.scala     |  1 -
 7 files changed, 13 insertions(+), 33 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 5c092dfd4..6f8890bd2 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -26,6 +26,7 @@ import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.common.util.FileUtils;
 import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.util.WebUtils;
 import org.apache.streampark.console.core.entity.AppBuildPipeline;
 import org.apache.streampark.console.core.entity.Application;
@@ -210,13 +211,18 @@ public class AppBuildPipeServiceImpl
               }
             } else {
               if (!app.getDependencyObject().getJar().isEmpty()) {
+                String localUploads = Workspace.local().APP_UPLOADS();
                 // copy jar to local upload dir
                 for (String jar : app.getDependencyObject().getJar()) {
                   File localJar = new File(WebUtils.getAppTempDir(), jar);
-                  Utils.required(localJar.exists());
-                  String localUploads = Workspace.local().APP_UPLOADS();
-                  String uploadJar = localUploads.concat("/").concat(jar);
-                  checkOrElseUploadJar(FsOperator.lfs(), localJar, uploadJar, 
localUploads);
+                  File uploadJar = new File(localUploads, jar);
+                  if (!localJar.exists() && !uploadJar.exists()) {
+                    throw new ApiAlertException("Missing file: " + jar + ", 
please upload again");
+                  }
+                  if (localJar.exists()) {
+                    checkOrElseUploadJar(
+                        FsOperator.lfs(), localJar, 
uploadJar.getAbsolutePath(), localUploads);
+                  }
                 }
               }
             }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 2ea6f2d75..3fbee60fa 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1272,7 +1272,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             appParam.getSavePointed(),
             appParam.getDrain(),
             customSavepoint,
-            appParam.getSavePointTimeout(),
             application.getK8sNamespace());
 
     final Date triggerTime = new Date();
diff --git 
a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts 
b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
index b5e2017e7..a56cb4dad 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
@@ -147,7 +147,6 @@ interface AppControl {
 export interface CancelParam {
   id: string;
   savePointed: boolean;
-  savePointTimeout: number;
   drain: boolean;
   savePoint: string;
 }
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
index db19b5022..34668f76b 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
@@ -69,16 +69,6 @@
         afterItem: () => h('span', { class: 'conf-switch' }, 'cancel job with 
savepoint path'),
         ifShow: ({ values }) => !!values.stopSavePointed,
       },
-      {
-        field: 'savePointTimeout',
-        label: 'Savepoint Timeout',
-        component: 'Select',
-        afterItem: () => h('span', { class: 'conf-switch' }, 'savepoint 
timeout, Unit: second'),
-        slot: 'savePointTimeout',
-        defaultValue: 60,
-        ifShow: ({ values }) => !!values.stopSavePointed,
-        required: true,
-      },
       {
         field: 'drain',
         label: 'Drain',
@@ -101,13 +91,12 @@
   /* submit */
   async function handleSubmit() {
     try {
-      const { stopSavePointed, customSavepoint, savePointTimeout, drain } =
+      const { stopSavePointed, customSavepoint, drain } =
         (await validate()) as Recordable;
       const stopReq = {
         id: app.id,
         savePointed: stopSavePointed,
         savePoint: customSavepoint,
-        savePointTimeout: savePointTimeout,
         drain: drain,
       };
 
@@ -166,10 +155,5 @@
       <SvgIcon name="shutdown" style="color: red" />
       {{ t('flink.app.view.stop') }}
     </template>
-    <BasicForm @register="registerForm" class="!pt-30px">
-      <template #savePointTimeout="{ model, field }">
-        <InputNumber v-model:value="model[field]" :min="10" :max="7200" 
step="10" />
-      </template>
-    </BasicForm>
   </BasicModal>
 </template>
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
index 99615d5d8..f11f561c9 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
@@ -33,6 +33,5 @@ case class CancelRequest(
     override val withSavepoint: Boolean,
     withDrain: Boolean,
     savepointPath: String,
-    savePointTimeout: Long,
     override val kubernetesNamespace: String = 
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
   extends SavepointRequestTrait
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 5b303e588..325bc9ff3 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -40,9 +40,7 @@ import org.apache.flink.util.FlinkException
 import org.apache.flink.util.Preconditions.checkNotNull
 
 import java.io.File
-import java.time.Duration
 import java.util.{Collections, List => JavaList, Map => JavaMap}
-import java.util.concurrent.TimeUnit
 
 import scala.annotation.tailrec
 import scala.collection.JavaConversions._
@@ -181,7 +179,6 @@ trait FlinkClientTrait extends Logger {
          |     clusterId         : ${cancelRequest.clusterId}
          |     withSavePoint     : ${cancelRequest.withSavepoint}
          |     savePointPath     : ${cancelRequest.savepointPath}
-         |     savePointTimeout  : ${cancelRequest.savePointTimeout}
          |     withDrain         : ${cancelRequest.withDrain}
          |     k8sNamespace      : ${cancelRequest.kubernetesNamespace}
          |     appId             : ${cancelRequest.clusterId}
@@ -189,9 +186,6 @@ trait FlinkClientTrait extends Logger {
          
|-------------------------------------------------------------------------------------------
          |""".stripMargin)
     val flinkConf = new Configuration()
-    flinkConf.safeSet(
-      ClientOptions.CLIENT_TIMEOUT,
-      Duration.ofSeconds(cancelRequest.savePointTimeout))
     doCancel(cancelRequest, flinkConf)
   }
 
@@ -531,11 +525,11 @@ trait FlinkClientTrait extends Logger {
       case (true, false) =>
         clientWrapper
           .cancelWithSavepoint(jobID, savePointDir)
-          .get(cancelRequest.savePointTimeout, TimeUnit.SECONDS)
+          .get()
       case (_, _) =>
         clientWrapper
           .stopWithSavepoint(jobID, cancelRequest.withDrain, savePointDir)
-          .get(cancelRequest.savePointTimeout, TimeUnit.SECONDS)
+          .get()
     }
   }
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 27f5fe7a5..2dd756b8f 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -19,7 +19,6 @@ package org.apache.streampark.flink.client.`trait`
 
 import org.apache.streampark.common.enums.{ExecutionMode, 
FlinkK8sRestExposedType}
 import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.kubernetes.IngressController
 import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
 
 import org.apache.commons.lang3.StringUtils

Reply via email to