This is an automated email from the ASF dual-hosted git repository.
gongzhongqiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 0e646728d [Improve] deploy flink job on k8s timeout improvement (#3425)
0e646728d is described below
commit 0e646728ded7ecf457817659c9283ab994cbd93c
Author: benjobs <[email protected]>
AuthorDate: Sat Dec 23 21:54:09 2023 +0800
[Improve] deploy flink job on k8s timeout improvement (#3425)
* [Improve] k8s timeout improvement
* [Improve] maven build args check improvement
---------
Co-authored-by: benjobs <[email protected]>
---
.../streampark/console/core/entity/Project.java | 37 +++++++++++++++-------
.../client/tool/FlinkSessionClientHelper.scala | 8 ++---
.../flink/kubernetes/KubernetesRetriever.scala | 20 ++++++------
.../watcher/FlinkCheckpointWatcher.scala | 4 +--
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 4 +--
.../kubernetes/watcher/FlinkMetricsWatcher.scala | 8 ++---
6 files changed, 49 insertions(+), 32 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
index 882d94f19..3ccbdac92 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
@@ -46,6 +46,8 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
@Slf4j
@@ -210,39 +212,49 @@ public class Project implements Serializable {
StringBuilder cmdBuffer = new StringBuilder(mvn).append(" clean package
-DskipTests ");
if (StringUtils.isNotBlank(this.buildArgs)) {
- List<String> dangerArgs = getDangerArgs(this.buildArgs);
- if (dangerArgs.isEmpty()) {
+ String dangerArgs = getDangerArgs(this.buildArgs);
+ if (dangerArgs == null) {
cmdBuffer.append(this.buildArgs.trim());
} else {
throw new IllegalArgumentException(
String.format(
- "Invalid build args, dangerous operation symbol detected: %s,
in your buildArgs: %s",
- dangerArgs.stream().collect(Collectors.joining(",")),
this.buildArgs));
+ "Invalid maven argument, dangerous args: %s, in your
buildArgs: %s",
+ dangerArgs, this.buildArgs));
}
}
String setting =
InternalConfigHolder.get(CommonConfig.MAVEN_SETTINGS_PATH());
if (StringUtils.isNotBlank(setting)) {
- List<String> dangerArgs = getDangerArgs(setting);
- if (dangerArgs.isEmpty()) {
+ String dangerArgs = getDangerArgs(setting);
+ if (dangerArgs == null) {
File file = new File(setting);
if (file.exists() && file.isFile()) {
cmdBuffer.append(" --settings ").append(setting);
} else {
throw new IllegalArgumentException(
- String.format("Invalid maven setting path, %s no exists or not
file", setting));
+ String.format("Invalid maven-setting file path, %s no exists or
not file", setting));
}
} else {
throw new IllegalArgumentException(
String.format(
- "Invalid maven setting path, dangerous operation symbol
detected: %s, in your maven setting path: %s",
- dangerArgs.stream().collect(Collectors.joining(",")),
setting));
+ "Invalid maven-setting file path, dangerous args: %s, in your
maven setting path: %s",
+ dangerArgs, setting));
}
}
return cmdBuffer.toString();
}
- private List<String> getDangerArgs(String param) {
+ private String getDangerArgs(String param) {
+ Pattern pattern = Pattern.compile("(`.*?`)|(\\$\\((.*?)\\))");
+ Matcher matcher = pattern.matcher(param);
+ if (matcher.find()) {
+ String dangerArgs = matcher.group(1);
+ if (dangerArgs == null) {
+ dangerArgs = matcher.group(2);
+ }
+ return dangerArgs;
+ }
+
String[] args = param.split("\\s+");
List<String> dangerArgs = new ArrayList<>();
for (String arg : args) {
@@ -263,7 +275,10 @@ public class Project implements Serializable {
}
}
}
- return dangerArgs;
+ if (!dangerArgs.isEmpty()) {
+ return dangerArgs.stream().collect(Collectors.joining(","));
+ }
+ return null;
}
@JsonIgnore
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
index e7d673628..63d5e8c64 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
@@ -59,8 +59,8 @@ object FlinkSessionSubmitHelper extends Logger {
// upload flink-job jar
val uploadResult = Request
.post(s"$jmRestUrl/jars/upload")
-
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
- .responseTimeout(Timeout.ofSeconds(60))
+ .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+ .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.body(
MultipartEntityBuilder
.create()
@@ -90,8 +90,8 @@ object FlinkSessionSubmitHelper extends Logger {
// refer to
https://ci.apache.org/projects/flink/flink-docs-stable/docs/ops/rest_api/#jars-upload
val resp = Request
.post(s"$jmRestUrl/jars/${jarUploadResponse.jarId}/run")
-
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
- .responseTimeout(Timeout.ofSeconds(60))
+ .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+ .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.body(new StringEntity(Serialization.write(new
JarRunRequest(flinkConfig))))
.execute
.returnContent()
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index d10f81b9f..916096f25 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -31,22 +31,22 @@ import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{Configuration, DeploymentOptions,
RestOptions}
import org.apache.flink.kubernetes.KubernetesClusterDescriptor
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
+import org.apache.hc.core5.util.Timeout
import javax.annotation.Nullable
-import java.time.Duration
-
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
object KubernetesRetriever extends Logger {
// see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
- val FLINK_CLIENT_TIMEOUT_SEC = 60L
+ val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
+
Timeout.ofMilliseconds(ClientOptions.CLIENT_TIMEOUT.defaultValue().toMillis)
+
// see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
- val FLINK_REST_AWAIT_TIMEOUT_SEC = 30L
- // see org.apache.flink.configuration.RestOptions.RETRY_MAX_ATTEMPTS
- val FLINK_REST_RETRY_MAX_ATTEMPTS = 30
+ val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
+ Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())
/** get new KubernetesClient */
@throws(classOf[KubernetesClientException])
@@ -70,9 +70,11 @@ object KubernetesRetriever extends Logger {
val flinkConfig = new Configuration()
flinkConfig.setString(DeploymentOptions.TARGET, executeMode.toString)
flinkConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId)
- flinkConfig.set(ClientOptions.CLIENT_TIMEOUT,
Duration.ofSeconds(FLINK_CLIENT_TIMEOUT_SEC))
- flinkConfig.setLong(RestOptions.AWAIT_LEADER_TIMEOUT,
FLINK_REST_AWAIT_TIMEOUT_SEC * 1000)
- flinkConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS,
FLINK_REST_RETRY_MAX_ATTEMPTS)
+ flinkConfig.set(ClientOptions.CLIENT_TIMEOUT,
ClientOptions.CLIENT_TIMEOUT.defaultValue())
+ flinkConfig.set(
+ RestOptions.AWAIT_LEADER_TIMEOUT,
+ RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())
+ flinkConfig.set(RestOptions.RETRY_MAX_ATTEMPTS,
RestOptions.RETRY_MAX_ATTEMPTS.defaultValue())
if (Try(namespace.isEmpty).getOrElse(true)) {
flinkConfig.setString(
KubernetesConfigOptions.NAMESPACE,
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
index 5ac00db54..a35da009c 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
@@ -115,8 +115,8 @@ class FlinkCheckpointWatcher(conf: MetricWatcherConfig =
MetricWatcherConfig.def
Checkpoint.as(
Request
.get(s"$flinkJmRestUrl/jobs/${trackId.jobId}/checkpoints")
-
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
-
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
+ .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+ .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent
.asString(StandardCharsets.UTF_8)) match {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 7dbfcfd0a..927ffc01d 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -266,8 +266,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
val jobDetails = JobDetails.as(
Request
.get(s"$restUrl/jobs/overview")
-
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
-
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
+ .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+ .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent()
.asString(StandardCharsets.UTF_8))
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index 8fc80a33a..7a57063bd 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -127,8 +127,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig =
MetricWatcherConfig.default
.as(
Request
.get(s"$flinkJmRestUrl/overview")
-
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
-
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
+ .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+ .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent
.asString(StandardCharsets.UTF_8))
@@ -140,8 +140,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig =
MetricWatcherConfig.default
.as(
Request
.get(s"$flinkJmRestUrl/jobmanager/config")
-
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
-
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
+ .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+ .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent
.asString(StandardCharsets.UTF_8))