This is an automated email from the ASF dual-hosted git repository.
cancai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new c2b7d4d0a [ISSUE-3087][Improve] Improve streampark-flink module based
on [3.6 Control/Condition Statements] (#3645)
c2b7d4d0a is described below
commit c2b7d4d0a134bbf0d640e45b0507f76f5801738e
Author: xxt <[email protected]>
AuthorDate: Mon Apr 1 12:24:04 2024 +0800
[ISSUE-3087][Improve] Improve streampark-flink module based on [3.6
Control/Condition Statements] (#3645)
[ISSUE-3087][Improve] Improve streampark-flink module based on [3.6
Control/Condition Statements] (#3645)
---
.../impl/KubernetesNativeSessionClient.scala | 9 ++++---
.../flink/client/impl/YarnApplicationClient.scala | 15 ++++++------
.../flink/client/impl/YarnSessionClient.scala | 11 ++++-----
.../flink/client/trait/FlinkClientTrait.scala | 25 ++++++++++---------
.../client/trait/KubernetesNativeClientTrait.scala | 28 +++++++++++-----------
.../flink/kubernetes/PodTemplateParser.scala | 7 +++---
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 8 +++----
.../streampark/flink/packer/maven/MavenTool.scala | 17 ++++++-------
.../packer/pipeline/DockerResolveProgress.scala | 8 +++----
.../impl/FlinkK8sApplicationBuildPipeline.scala | 11 +++++----
.../flink/proxy/ChildFirstClassLoader.scala | 4 ++--
.../streampark/flink/core/SqlCommandParser.scala | 14 +++++------
12 files changed, 77 insertions(+), 80 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index e2684014e..de9a75dc6 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -235,11 +235,10 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
kubeClient =
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
- if (
- shutDownRequest.clusterId != null && kubeClientWrapper
- .getService(shutDownRequest.clusterId)
- .isPresent
- ) {
+ val stopAndCleanupState = shutDownRequest.clusterId != null &&
kubeClientWrapper
+ .getService(shutDownRequest.clusterId)
+ .isPresent
+ if (stopAndCleanupState) {
kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
ShutDownResponse(shutDownRequest.clusterId)
} else {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index ec0a8091b..b2d3c8499 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -144,13 +144,14 @@ object YarnApplicationClient extends YarnClientTrait {
flinkConfig: Configuration): SubmitResponse = {
var proxyUserUgi: UserGroupInformation =
UserGroupInformation.getCurrentUser
val currentUser = UserGroupInformation.getCurrentUser
- if (!HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
- if (StringUtils.isNotEmpty(submitRequest.hadoopUser)) {
- proxyUserUgi = UserGroupInformation.createProxyUser(
- submitRequest.hadoopUser,
- currentUser
- )
- }
+ val eableProxyState =
+ !HadoopUtils.isKerberosSecurityEnabled(currentUser) &&
StringUtils.isNotEmpty(
+ submitRequest.hadoopUser)
+ if (eableProxyState) {
+ proxyUserUgi = UserGroupInformation.createProxyUser(
+ submitRequest.hadoopUser,
+ currentUser
+ )
}
proxyUserUgi.doAs[SubmitResponse](new PrivilegedAction[SubmitResponse] {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index c91fc45f0..9aaa133fe 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -259,12 +259,11 @@ object YarnSessionClient extends YarnClientTrait {
flinkConfig.safeSet(DeploymentOptions.TARGET,
YarnDeploymentTarget.SESSION.getName)
val yarnClusterDescriptor = getSessionClusterDescriptor(flinkConfig)
clusterDescriptor = yarnClusterDescriptor._2
- if (
- FinalApplicationStatus.UNDEFINED.equals(
- clusterDescriptor.getYarnClient
-
.getApplicationReport(ApplicationId.fromString(shutDownRequest.clusterId))
- .getFinalApplicationStatus)
- ) {
+ val shutDownState = FinalApplicationStatus.UNDEFINED.equals(
+ clusterDescriptor.getYarnClient
+
.getApplicationReport(ApplicationId.fromString(shutDownRequest.clusterId))
+ .getFinalApplicationStatus)
+ if (shutDownState) {
val clientProvider =
clusterDescriptor.retrieve(yarnClusterDescriptor._1)
client = clientProvider.getClusterClient
client.shutDownCluster()
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index a31250d4c..5cf2a9d51 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -136,10 +136,9 @@ trait FlinkClientTrait extends Logger {
flinkConfig.setBoolean(
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
submitRequest.allowNonRestoredState)
- if (
- submitRequest.flinkVersion.checkVersion(
- FlinkRestoreMode.SINCE_FLINK_VERSION) && submitRequest.restoreMode
!= null
- ) {
+ val eableRestoreModeState = submitRequest.flinkVersion.checkVersion(
+ FlinkRestoreMode.SINCE_FLINK_VERSION) && submitRequest.restoreMode !=
null
+ if (eableRestoreModeState) {
flinkConfig.setString(FlinkRestoreMode.RESTORE_MODE,
submitRequest.restoreMode.getName);
}
}
@@ -473,17 +472,17 @@ trait FlinkClientTrait extends Logger {
}
// execution.runtime-mode
- if (submitRequest.properties.nonEmpty) {
- if
(submitRequest.properties.containsKey(ExecutionOptions.RUNTIME_MODE.key())) {
- programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
- programArgs +=
submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key()).toString
- }
+ val addRuntimeModeState =
+ submitRequest.properties.nonEmpty &&
submitRequest.properties.containsKey(
+ ExecutionOptions.RUNTIME_MODE.key())
+ if (addRuntimeModeState) {
+ programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
+ programArgs +=
submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key()).toString
}
- if (
- submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK
- && submitRequest.executionMode != FlinkExecutionMode.YARN_APPLICATION
- ) {
+ val addUserJarFileState =
+ submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK &&
submitRequest.executionMode != FlinkExecutionMode.YARN_APPLICATION
+ if (addUserJarFileState) {
// python file
programArgs.add("-py")
programArgs.add(submitRequest.userJarFile.getAbsolutePath)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 49958a20c..bbaf5e026 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -47,20 +47,20 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
covertToServiceExposedType(submitRequest.k8sSubmitParam.flinkRestExposedType.get))
- if (submitRequest.buildResult != null) {
- if (submitRequest.executionMode ==
FlinkExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
- val buildResult =
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse]
- buildResult.podTemplatePaths.foreach(
- p => {
- if (PodTemplateTool.KUBERNETES_POD_TEMPLATE.key.equals(p._1)) {
-
flinkConfig.safeSet(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, p._2)
- } else if
(PodTemplateTool.KUBERNETES_JM_POD_TEMPLATE.key.equals(p._1)) {
-
flinkConfig.safeSet(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, p._2)
- } else if
(PodTemplateTool.KUBERNETES_TM_POD_TEMPLATE.key.equals(p._1)) {
-
flinkConfig.safeSet(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, p._2)
- }
- })
- }
+ val addBuildParamState =
+ submitRequest.buildResult != null && submitRequest.executionMode ==
FlinkExecutionMode.KUBERNETES_NATIVE_APPLICATION
+ if (addBuildParamState) {
+ val buildResult =
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse]
+ buildResult.podTemplatePaths.foreach(
+ p => {
+ if (PodTemplateTool.KUBERNETES_POD_TEMPLATE.key.equals(p._1)) {
+
flinkConfig.safeSet(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, p._2)
+ } else if
(PodTemplateTool.KUBERNETES_JM_POD_TEMPLATE.key.equals(p._1)) {
+
flinkConfig.safeSet(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, p._2)
+ } else if
(PodTemplateTool.KUBERNETES_TM_POD_TEMPLATE.key.equals(p._1)) {
+
flinkConfig.safeSet(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, p._2)
+ }
+ })
}
// add flink conf configuration, mainly to set the log4j configuration
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateParser.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateParser.scala
index e66e7fe75..b515824ed 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateParser.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateParser.scala
@@ -70,10 +70,9 @@ object PodTemplateParser {
}))
}
- if (
- root.containsKey("spec")
- && Try(!root.get("spec").asInstanceOf[JMap[String,
Any]].isEmpty).getOrElse(false)
- ) {
+ val enableSpecState = root.containsKey("spec") && Try(
+ !root.get("spec").asInstanceOf[JMap[String,
Any]].isEmpty).getOrElse(false)
+ if (enableSpecState) {
res.put("spec", root.get("spec"))
}
yaml.dumpAsMap(res)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 927ffc01d..aa7d57b02 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -115,9 +115,9 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
case Some(jobState) =>
val trackId = id.copy(jobId = jobState.jobId)
val latest: JobStatusCV =
watchController.jobStatuses.get(trackId)
- if (
+ val updateState =
latest == null || latest.jobState != jobState.jobState ||
latest.jobId != jobState.jobId
- ) {
+ if (updateState) {
// put job status to cache
watchController.jobStatuses.put(trackId, jobState)
// set jobId to trackIds
@@ -327,9 +327,9 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
pollEmitTime = pollEmitTime,
pollAckTime = System.currentTimeMillis)
- if (
+ val updateJobState =
jobState == FlinkJobStateEnum.SILENT && latest != null &&
latest.jobState == FlinkJobStateEnum.SILENT
- ) {
+ if (updateJobState) {
Some(jobStatusCV.copy(pollEmitTime = latest.pollEmitTime, pollAckTime =
latest.pollAckTime))
} else {
Some(jobStatusCV)
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index c424b3beb..186d444cb 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -65,11 +65,11 @@ object MavenTool extends Logger {
"central",
Constant.DEFAULT,
InternalConfigHolder.get(MAVEN_REMOTE_URL))
- val remoteRepository =
- if (
+ val remoteRepository = {
+ val buildState =
InternalConfigHolder.get(MAVEN_AUTH_USER) == null ||
InternalConfigHolder.get(
MAVEN_AUTH_PASSWORD) == null
- ) {
+ if (buildState) {
builder.build()
} else {
val authentication = new AuthenticationBuilder()
@@ -78,6 +78,7 @@ object MavenTool extends Logger {
.build()
builder.setAuthentication(authentication).build()
}
+ }
List(remoteRepository)
}
@@ -253,11 +254,11 @@ object MavenTool extends Logger {
override def canFilter(jar: File): Boolean = true
override def isFiltered(name: String): Boolean = {
- if (name.startsWith("META-INF/")) {
- if (name.endsWith(".SF") || name.endsWith(".DSA") ||
name.endsWith(".RSA")) {
- logInfo(s"shade ignore file: $name")
- return true
- }
+ val isFilteredState = name.startsWith("META-INF/") &&
name.endsWith(".SF") || name.endsWith(
+ ".DSA") || name.endsWith(".RSA")
+ if (isFilteredState) {
+ logInfo(s"shade ignore file: $name")
+ return true
}
false
}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
index eb9b402a7..9c8e94453 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
@@ -37,10 +37,10 @@ class DockerPullProgress(
var lastTime: Long) {
// noinspection DuplicatedCode
def update(pullRsp: PullResponseItem): Unit = {
- if (
+ val nonPullUpdateState =
pullRsp == null || StringUtils.isBlank(pullRsp.getId) ||
StringUtils.isBlank(
pullRsp.getStatus)
- ) {
+ if (nonPullUpdateState) {
return
}
if (pullRsp.getStatus.contains("complete")) {
@@ -79,10 +79,10 @@ class DockerPushProgress(
var lastTime: Long) {
// noinspection DuplicatedCode
def update(pushRsp: PushResponseItem): Unit = {
- if (
+ val nonPushUpdateState =
pushRsp == null || StringUtils.isBlank(pushRsp.getId) ||
StringUtils.isBlank(
pushRsp.getStatus)
- ) {
+ if (nonPushUpdateState) {
return
}
if (pushRsp.getStatus.contains("complete")) {
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index ba8af634b..25849fa32 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -138,10 +138,9 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
val pullImageCmd = {
// when the register address prefix is explicitly identified on
base image tag,
// the user's pre-saved docker register auth info would be used.
- if (
- dockerConf.registerAddress != null && !baseImageTag.startsWith(
- dockerConf.registerAddress)
- ) {
+ val pullImageCmdState = dockerConf.registerAddress != null &&
!baseImageTag.startsWith(
+ dockerConf.registerAddress)
+ if (pullImageCmdState) {
dockerClient.pullImageCmd(baseImageTag)
} else {
dockerClient.pullImageCmd(baseImageTag).withAuthConfig(dockerConf.toAuthConf)
@@ -229,7 +228,9 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
registerAddress: String,
imageNamespace: String): String = {
var tagName = if (tag.contains("/")) tag else s"$imageNamespace/$tag"
- if (StringUtils.isNotBlank(registerAddress) &&
!tagName.startsWith(registerAddress)) {
+ val addRegisterAddressState =
+ StringUtils.isNotBlank(registerAddress) &&
!tagName.startsWith(registerAddress)
+ if (addRegisterAddressState) {
tagName = s"$registerAddress/$tagName"
}
tagName.toLowerCase
diff --git
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
index 7db1fbe90..b6e9d7b57 100644
---
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
+++
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
@@ -101,9 +101,9 @@ class ChildFirstClassLoader(
if (urlClassLoaderResource != null && JAR_PROTOCOL ==
urlClassLoaderResource.getProtocol) {
val spec = urlClassLoaderResource.getFile
val filename = new File(spec.substring(0, spec.indexOf("!/"))).getName
- if (
+ val matchState =
FLINK_PATTERN.matcher(filename).matches &&
!flinkResourcePattern.matcher(filename).matches
- ) {
+ if (matchState) {
return null
}
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index 12a2fdc47..19ced0e3d 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -641,15 +641,13 @@ object SqlSplitter {
private[this] def isSingleLineComment(curChar: Char, nextChar: Char):
Boolean = {
var flag = false
for (singleCommentPrefix <- singleLineCommentPrefixList) {
- if (singleCommentPrefix.length == 1) {
- if (curChar == singleCommentPrefix.charAt(0)) {
+ singleCommentPrefix.length match {
+ case 1 if curChar == singleCommentPrefix.charAt(0) => flag = true
+ case 2
+ if curChar == singleCommentPrefix.charAt(0) && nextChar ==
singleCommentPrefix.charAt(
+ 1) =>
flag = true
- }
- }
- if (singleCommentPrefix.length == 2) {
- if (curChar == singleCommentPrefix.charAt(0) && nextChar ==
singleCommentPrefix.charAt(1)) {
- flag = true
- }
+ case _ =>
}
}
flag