This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new a672ffe27 YarnSessionClient code optimization (#2911)
a672ffe27 is described below
commit a672ffe278bf5be8183433f8dc80b818821f3fa1
Author: ChengJie1053 <[email protected]>
AuthorDate: Mon Jul 31 20:43:53 2023 +0800
YarnSessionClient code optimization (#2911)
* YarnSessionClient code optimization
---
.../org/apache/streampark/flink/client/impl/YarnSessionClient.scala | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index fbcb38c4c..40a278e40 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -21,6 +21,7 @@ import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
+import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.common.JobID
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
@@ -37,7 +38,6 @@ import java.util
import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions._
-import scala.collection.mutable.ListBuffer
/** Submit Job to YARN Session Cluster */
object YarnSessionClient extends YarnClientTrait {
@@ -207,11 +207,11 @@ object YarnSessionClient extends YarnClientTrait {
deployClusterConfig(deployRequest, flinkConfig)
val yarnClusterDescriptor =
getSessionClusterDeployDescriptor(flinkConfig)
clusterDescriptor = yarnClusterDescriptor._2
- if (null != deployRequest.clusterId && deployRequest.clusterId.nonEmpty)
{
+ if (StringUtils.isNotBlank(deployRequest.clusterId)) {
try {
val applicationStatus =
clusterDescriptor.getYarnClient
-
.getApplicationReport(ConverterUtils.toApplicationId(deployRequest.clusterId))
+
.getApplicationReport(ApplicationId.fromString(deployRequest.clusterId))
.getFinalApplicationStatus
if (FinalApplicationStatus.UNDEFINED.equals(applicationStatus)) {
// application is running