This is an automated email from the ASF dual-hosted git repository.
lvshaokang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new b5d70605c [Bug] savepoint timeout bug fixed. (#2652)
b5d70605c is described below
commit b5d70605c8c24a32ed417663f7bbdf76e7f634e6
Author: benjobs <[email protected]>
AuthorDate: Wed Apr 19 11:02:02 2023 +0800
[Bug] savepoint timeout bug fixed. (#2652)
* savepoint timeout bug fixed.
* remove unused package
---
.../core/service/impl/AppBuildPipeServiceImpl.java | 14 ++++++++++----
.../core/service/impl/ApplicationServiceImpl.java | 1 -
.../src/api/flink/app/app.type.ts | 1 -
.../app/components/AppView/StopApplicationModal.vue | 18 +-----------------
.../streampark/flink/client/bean/CancelRequest.scala | 1 -
.../flink/client/trait/FlinkClientTrait.scala | 10 ++--------
.../client/trait/KubernetesNativeClientTrait.scala | 1 -
7 files changed, 13 insertions(+), 33 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 5c092dfd4..6f8890bd2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -26,6 +26,7 @@ import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.FileUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
import org.apache.streampark.console.core.entity.Application;
@@ -210,13 +211,18 @@ public class AppBuildPipeServiceImpl
}
} else {
if (!app.getDependencyObject().getJar().isEmpty()) {
+ String localUploads = Workspace.local().APP_UPLOADS();
// copy jar to local upload dir
for (String jar : app.getDependencyObject().getJar()) {
File localJar = new File(WebUtils.getAppTempDir(), jar);
- Utils.required(localJar.exists());
- String localUploads = Workspace.local().APP_UPLOADS();
- String uploadJar = localUploads.concat("/").concat(jar);
- checkOrElseUploadJar(FsOperator.lfs(), localJar, uploadJar,
localUploads);
+ File uploadJar = new File(localUploads, jar);
+ if (!localJar.exists() && !uploadJar.exists()) {
+ throw new ApiAlertException("Missing file: " + jar + ",
please upload again");
+ }
+ if (localJar.exists()) {
+ checkOrElseUploadJar(
+ FsOperator.lfs(), localJar,
uploadJar.getAbsolutePath(), localUploads);
+ }
}
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 2ea6f2d75..3fbee60fa 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1272,7 +1272,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
appParam.getSavePointed(),
appParam.getDrain(),
customSavepoint,
- appParam.getSavePointTimeout(),
application.getK8sNamespace());
final Date triggerTime = new Date();
diff --git
a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
index b5e2017e7..a56cb4dad 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
@@ -147,7 +147,6 @@ interface AppControl {
export interface CancelParam {
id: string;
savePointed: boolean;
- savePointTimeout: number;
drain: boolean;
savePoint: string;
}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
index db19b5022..34668f76b 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
@@ -69,16 +69,6 @@
afterItem: () => h('span', { class: 'conf-switch' }, 'cancel job with
savepoint path'),
ifShow: ({ values }) => !!values.stopSavePointed,
},
- {
- field: 'savePointTimeout',
- label: 'Savepoint Timeout',
- component: 'Select',
- afterItem: () => h('span', { class: 'conf-switch' }, 'savepoint
timeout, Unit: second'),
- slot: 'savePointTimeout',
- defaultValue: 60,
- ifShow: ({ values }) => !!values.stopSavePointed,
- required: true,
- },
{
field: 'drain',
label: 'Drain',
@@ -101,13 +91,12 @@
/* submit */
async function handleSubmit() {
try {
- const { stopSavePointed, customSavepoint, savePointTimeout, drain } =
+ const { stopSavePointed, customSavepoint, drain } =
(await validate()) as Recordable;
const stopReq = {
id: app.id,
savePointed: stopSavePointed,
savePoint: customSavepoint,
- savePointTimeout: savePointTimeout,
drain: drain,
};
@@ -166,10 +155,5 @@
<SvgIcon name="shutdown" style="color: red" />
{{ t('flink.app.view.stop') }}
</template>
- <BasicForm @register="registerForm" class="!pt-30px">
- <template #savePointTimeout="{ model, field }">
- <InputNumber v-model:value="model[field]" :min="10" :max="7200"
step="10" />
- </template>
- </BasicForm>
</BasicModal>
</template>
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
index 99615d5d8..f11f561c9 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
@@ -33,6 +33,5 @@ case class CancelRequest(
override val withSavepoint: Boolean,
withDrain: Boolean,
savepointPath: String,
- savePointTimeout: Long,
override val kubernetesNamespace: String =
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
extends SavepointRequestTrait
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 5b303e588..325bc9ff3 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -40,9 +40,7 @@ import org.apache.flink.util.FlinkException
import org.apache.flink.util.Preconditions.checkNotNull
import java.io.File
-import java.time.Duration
import java.util.{Collections, List => JavaList, Map => JavaMap}
-import java.util.concurrent.TimeUnit
import scala.annotation.tailrec
import scala.collection.JavaConversions._
@@ -181,7 +179,6 @@ trait FlinkClientTrait extends Logger {
| clusterId : ${cancelRequest.clusterId}
| withSavePoint : ${cancelRequest.withSavepoint}
| savePointPath : ${cancelRequest.savepointPath}
- | savePointTimeout : ${cancelRequest.savePointTimeout}
| withDrain : ${cancelRequest.withDrain}
| k8sNamespace : ${cancelRequest.kubernetesNamespace}
| appId : ${cancelRequest.clusterId}
@@ -189,9 +186,6 @@ trait FlinkClientTrait extends Logger {
|-------------------------------------------------------------------------------------------
|""".stripMargin)
val flinkConf = new Configuration()
- flinkConf.safeSet(
- ClientOptions.CLIENT_TIMEOUT,
- Duration.ofSeconds(cancelRequest.savePointTimeout))
doCancel(cancelRequest, flinkConf)
}
@@ -531,11 +525,11 @@ trait FlinkClientTrait extends Logger {
case (true, false) =>
clientWrapper
.cancelWithSavepoint(jobID, savePointDir)
- .get(cancelRequest.savePointTimeout, TimeUnit.SECONDS)
+ .get()
case (_, _) =>
clientWrapper
.stopWithSavepoint(jobID, cancelRequest.withDrain, savePointDir)
- .get(cancelRequest.savePointTimeout, TimeUnit.SECONDS)
+ .get()
}
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 27f5fe7a5..2dd756b8f 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -19,7 +19,6 @@ package org.apache.streampark.flink.client.`trait`
import org.apache.streampark.common.enums.{ExecutionMode,
FlinkK8sRestExposedType}
import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.kubernetes.IngressController
import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
import org.apache.commons.lang3.StringUtils