This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch job-state
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 76a1b28156621d1fab538009f9516377298ebe0f
Author: benjobs <[email protected]>
AuthorDate: Fri Nov 10 13:41:41 2023 +0800

    [Bug] deploy flink job on yarn, get state bug fixed.
---
 .../apache/streampark/common/util/YarnUtils.scala  | 11 ++--
 .../core/service/impl/AppBuildPipeServiceImpl.java | 29 ++++++++--
 .../console/core/task/FlinkRESTAPIWatcher.java     | 63 ++++++++++------------
 .../flink/client/trait/FlinkClientTrait.scala      | 26 ++++-----
 4 files changed, 68 insertions(+), 61 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index 83db1d78c..d20b7bb6e 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -30,6 +30,7 @@ import org.apache.http.client.methods.HttpGet
 import org.apache.http.client.protocol.HttpClientContext
 import org.apache.http.impl.client.HttpClients
 
+import java.io.IOException
 import java.net.InetAddress
 import java.security.PrivilegedExceptionAction
 import java.util
@@ -257,20 +258,19 @@ object YarnUtils extends Logger {
    *   url
    * @return
    */
+  @throws[IOException]
   def restRequest(url: String): String = {
     if (url == null) return null
-
     url match {
       case u if u.matches("^http(|s)://.*") =>
         Try(request(url)) match {
           case Success(v) => v
           case Failure(e) =>
             if (hasYarnHttpKerberosAuth) {
-              logError(s"yarnUtils authRestRequest error, url: $u, detail: $e")
+              throw new IOException(s"yarnUtils authRestRequest error, url: 
$u, detail: $e")
             } else {
-              logError(s"yarnUtils restRequest error, url: $u, detail: $e")
+              throw new IOException(s"yarnUtils restRequest error, url: $u, 
detail: $e")
             }
-            null
         }
       case _ =>
         Try(request(s"${getRMWebAppURL()}/$url")) match {
@@ -281,8 +281,7 @@ object YarnUtils extends Logger {
             } match {
               case Success(v) => v
               case Failure(e) =>
-                logError(s"yarnUtils restRequest retry 5 times all failed. 
detail: $e")
-                null
+                throw new IOException(s"yarnUtils restRequest retry 5 times 
all failed. detail: $e")
             }
         }
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 07c66ff05..3de46e7a6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -71,6 +71,7 @@ import 
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sSessionBuildPipe
 import 
org.apache.streampark.flink.packer.pipeline.impl.FlinkRemoteBuildPipeline;
 import 
org.apache.streampark.flink.packer.pipeline.impl.FlinkYarnApplicationBuildPipeline;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.collections.CollectionUtils;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -86,6 +87,9 @@ import 
org.springframework.transaction.annotation.Transactional;
 import javax.annotation.Nonnull;
 
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -164,7 +168,7 @@ public class AppBuildPipeServiceImpl
     pipeline.registerWatcher(
         new PipeWatcher() {
           @Override
-          public void onStart(PipeSnapshot snapshot) {
+          public void onStart(PipeSnapshot snapshot) throws Exception {
             AppBuildPipeline buildPipeline =
                 
AppBuildPipeline.fromPipeSnapshot(snapshot).setAppId(app.getId());
             saveEntity(buildPipeline);
@@ -364,7 +368,7 @@ public class AppBuildPipeServiceImpl
     }
   }
 
-  private void prepareJars(Application app) {
+  private void prepareJars(Application app) throws IOException {
     File localUploadDIR = new File(Workspace.local().APP_UPLOADS());
     if (!localUploadDIR.exists()) {
       localUploadDIR.mkdirs();
@@ -421,7 +425,7 @@ public class AppBuildPipeServiceImpl
         if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
           List<File> jars = new ArrayList<>(0);
 
-          // 1) user jar
+          // 1). user jar
           jars.add(libJar);
 
           // 2). jar dependency
@@ -434,9 +438,24 @@ public class AppBuildPipeServiceImpl
             
jars.addAll(MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts()));
           }
 
+          // 4). local uploadDIR to hdfs uploadsDIR
+          String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
+          for (File jarFile : jars) {
+            String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
+            if (!fsOperator.exists(hdfsUploadPath)) {
+              fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
+            } else {
+              InputStream inputStream = Files.newInputStream(jarFile.toPath());
+              if 
(!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) {
+                fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
+              }
+            }
+          }
+
+          // 5). copy jars to $hdfs_app_home/lib
           fsOperator.mkCleanDirs(app.getAppLib());
-          // 4). upload jars to appLibDIR
-          jars.forEach(jar -> fsOperator.upload(jar.getAbsolutePath(), 
app.getAppLib()));
+          jars.forEach(
+              jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), 
app.getAppLib()));
         }
       } else {
         String appHome = app.getAppHome();
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index b734a7a03..a11451764 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -207,7 +207,6 @@ public class FlinkRESTAPIWatcher {
               STOP_FROM_MAP.getOrDefault(key, null) == null
                   ? StopFrom.NONE
                   : STOP_FROM_MAP.get(key);
-          final OptionState optionState = OPTIONING.get(key);
           try {
             // query status from flink rest api
             getFromFlinkRestApi(application, stopFrom);
@@ -220,37 +219,36 @@ public class FlinkRESTAPIWatcher {
                Query from flink's restAPI and yarn's restAPI both failed.
                In this case, it is necessary to decide whether to return to 
the final state depending on the state being operated
               */
-              if (optionState == null || 
!optionState.equals(OptionState.STARTING)) {
-                // non-mapping
-                if (application.getState() != 
FlinkAppState.MAPPING.getValue()) {
-                  log.error(
-                      "FlinkRESTAPIWatcher getFromFlinkRestApi and 
getFromYarnRestApi error,job failed,savePoint expired!");
-                  if (StopFrom.NONE.equals(stopFrom)) {
-                    savePointService.expire(application.getId());
-                    application.setState(FlinkAppState.LOST.getValue());
-                    alertService.alert(application, FlinkAppState.LOST);
-                  } else {
-                    application.setState(FlinkAppState.CANCELED.getValue());
-                  }
+              // non-mapping
+              if (application.getState() != FlinkAppState.MAPPING.getValue()) {
+                log.error(
+                    "FlinkRESTAPIWatcher getFromFlinkRestApi and 
getFromYarnRestApi error,job failed,savePoint expired!");
+                if (StopFrom.NONE.equals(stopFrom)) {
+                  savePointService.expire(application.getId());
+                  application.setState(FlinkAppState.LOST.getValue());
+                  alertService.alert(application, FlinkAppState.LOST);
+                } else {
+                  application.setState(FlinkAppState.CANCELED.getValue());
                 }
-                /*
-                 This step means that the above two ways to get information 
have failed, and this step is the last step,
-                 which will directly identify the mission as cancelled or lost.
-                 Need clean savepoint.
-                */
-                application.setEndTime(new Date());
-                cleanSavepoint(application);
-                cleanOptioning(optionState, key);
-                doPersistMetrics(application, true);
-                FlinkAppState appState = 
FlinkAppState.of(application.getState());
-                if (appState.equals(FlinkAppState.FAILED) || 
appState.equals(FlinkAppState.LOST)) {
-                  alertService.alert(application, 
FlinkAppState.of(application.getState()));
-                  if (appState.equals(FlinkAppState.FAILED)) {
-                    try {
-                      applicationService.start(application, true);
-                    } catch (Exception e) {
-                      log.error(e.getMessage(), e);
-                    }
+              }
+              /*
+               This step means that the above two ways to get information have 
failed, and this step is the last step,
+               which will directly identify the mission as cancelled or lost.
+               Need clean savepoint.
+              */
+              application.setEndTime(new Date());
+              cleanSavepoint(application);
+              OptionState optionState = OPTIONING.get(key);
+              cleanOptioning(optionState, key);
+              doPersistMetrics(application, true);
+              FlinkAppState appState = 
FlinkAppState.of(application.getState());
+              if (appState.equals(FlinkAppState.FAILED) || 
appState.equals(FlinkAppState.LOST)) {
+                alertService.alert(application, 
FlinkAppState.of(application.getState()));
+                if (appState.equals(FlinkAppState.FAILED)) {
+                  try {
+                    applicationService.start(application, true);
+                  } catch (Exception e) {
+                    log.error(e.getMessage(), e);
                   }
                 }
               }
@@ -738,9 +736,6 @@ public class FlinkRESTAPIWatcher {
 
   private <T> T yarnRestRequest(String url, Class<T> clazz) throws IOException 
{
     String result = YarnUtils.restRequest(url);
-    if (null == result) {
-      return null;
-    }
     return JacksonUtils.read(result, clazz);
   }
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 58dc7f82b..4088d96cc 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -207,28 +207,22 @@ trait FlinkClientTrait extends Logger {
       jobGraphFunc(submitRequest, flinkConfig, jarFile)
     } match {
       case Failure(e) =>
-        logWarn(
-          s"""\n
-             |[flink-submit] JobGraph Submit Plan failed, error detail:
-             
|------------------------------------------------------------------
-             |${Utils.stringifyException(e)}
-             
|------------------------------------------------------------------
-             |Now retry submit with RestAPI Plan ...
-             |""".stripMargin
-        )
         Try(restApiFunc(submitRequest, flinkConfig, jarFile)) match {
           case Success(r) => r
-          case Failure(e) =>
-            logError(
+          case Failure(e1) =>
+            throw new RuntimeException(
               s"""\n
-                 |[flink-submit] RestAPI Submit failed, error detail:
+                 |[flink-submit] Both JobGraph submit plan and Rest API submit 
plan all failed!
+                 |JobGraph submit plan failed detail:
                  
|------------------------------------------------------------------
                  |${Utils.stringifyException(e)}
                  
|------------------------------------------------------------------
-                 |Both JobGraph submit plan and Rest API submit plan all 
failed!
-                 |""".stripMargin
-            )
-            throw e
+                 |
+                 | RestAPI Submit failed, error detail:
+                 | 
------------------------------------------------------------------
+                 |${Utils.stringifyException(e1)}
+                 
|------------------------------------------------------------------
+                 |""".stripMargin)
         }
       case Success(v) => v
     }

Reply via email to