This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch job-state in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 76a1b28156621d1fab538009f9516377298ebe0f Author: benjobs <[email protected]> AuthorDate: Fri Nov 10 13:41:41 2023 +0800 [Bug] deploy flink job on yarn, get state bug fixed. --- .../apache/streampark/common/util/YarnUtils.scala | 11 ++-- .../core/service/impl/AppBuildPipeServiceImpl.java | 29 ++++++++-- .../console/core/task/FlinkRESTAPIWatcher.java | 63 ++++++++++------------ .../flink/client/trait/FlinkClientTrait.scala | 26 ++++----- 4 files changed, 68 insertions(+), 61 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala index 83db1d78c..d20b7bb6e 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala @@ -30,6 +30,7 @@ import org.apache.http.client.methods.HttpGet import org.apache.http.client.protocol.HttpClientContext import org.apache.http.impl.client.HttpClients +import java.io.IOException import java.net.InetAddress import java.security.PrivilegedExceptionAction import java.util @@ -257,20 +258,19 @@ object YarnUtils extends Logger { * url * @return */ + @throws[IOException] def restRequest(url: String): String = { if (url == null) return null - url match { case u if u.matches("^http(|s)://.*") => Try(request(url)) match { case Success(v) => v case Failure(e) => if (hasYarnHttpKerberosAuth) { - logError(s"yarnUtils authRestRequest error, url: $u, detail: $e") + throw new IOException(s"yarnUtils authRestRequest error, url: $u, detail: $e") } else { - logError(s"yarnUtils restRequest error, url: $u, detail: $e") + throw new IOException(s"yarnUtils restRequest error, url: $u, detail: $e") } - null } case _ => Try(request(s"${getRMWebAppURL()}/$url")) match { @@ -281,8 +281,7 @@ object YarnUtils extends Logger { } match { case Success(v) => v case Failure(e) => - logError(s"yarnUtils restRequest retry 5 times all failed. detail: $e") - null + throw new IOException(s"yarnUtils restRequest retry 5 times all failed. detail: $e") } } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 07c66ff05..3de46e7a6 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -71,6 +71,7 @@ import org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sSessionBuildPipe import org.apache.streampark.flink.packer.pipeline.impl.FlinkRemoteBuildPipeline; import org.apache.streampark.flink.packer.pipeline.impl.FlinkYarnApplicationBuildPipeline; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.collections.CollectionUtils; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -86,6 +87,9 @@ import org.springframework.transaction.annotation.Transactional; import javax.annotation.Nonnull; import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -164,7 +168,7 @@ public class AppBuildPipeServiceImpl pipeline.registerWatcher( new PipeWatcher() { @Override - public void onStart(PipeSnapshot snapshot) { + public void onStart(PipeSnapshot snapshot) throws Exception { AppBuildPipeline buildPipeline = AppBuildPipeline.fromPipeSnapshot(snapshot).setAppId(app.getId()); saveEntity(buildPipeline); @@ -364,7 +368,7 @@ public class AppBuildPipeServiceImpl } } - private void prepareJars(Application app) { + private void prepareJars(Application app) throws IOException { File localUploadDIR = new File(Workspace.local().APP_UPLOADS()); if (!localUploadDIR.exists()) { localUploadDIR.mkdirs(); @@ -421,7 +425,7 @@ public class AppBuildPipeServiceImpl if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) { List<File> jars = new ArrayList<>(0); - // 1) user jar + // 1). user jar jars.add(libJar); // 2). jar dependency @@ -434,9 +438,24 @@ public class AppBuildPipeServiceImpl jars.addAll(MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts())); } + // 4). local uploadDIR to hdfs uploadsDIR + String hdfsUploadDIR = Workspace.remote().APP_UPLOADS(); + for (File jarFile : jars) { + String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName(); + if (!fsOperator.exists(hdfsUploadPath)) { + fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR); + } else { + InputStream inputStream = Files.newInputStream(jarFile.toPath()); + if (!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) { + fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR); + } + } + } + + // 5). copy jars to $hdfs_app_home/lib fsOperator.mkCleanDirs(app.getAppLib()); - // 4). upload jars to appLibDIR - jars.forEach(jar -> fsOperator.upload(jar.getAbsolutePath(), app.getAppLib())); + jars.forEach( + jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), app.getAppLib())); } } else { String appHome = app.getAppHome(); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java index b734a7a03..a11451764 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java @@ -207,7 +207,6 @@ public class FlinkRESTAPIWatcher { STOP_FROM_MAP.getOrDefault(key, null) == null ? StopFrom.NONE : STOP_FROM_MAP.get(key); - final OptionState optionState = OPTIONING.get(key); try { // query status from flink rest api getFromFlinkRestApi(application, stopFrom); @@ -220,37 +219,36 @@ public class FlinkRESTAPIWatcher { Query from flink's restAPI and yarn's restAPI both failed. In this case, it is necessary to decide whether to return to the final state depending on the state being operated */ - if (optionState == null || !optionState.equals(OptionState.STARTING)) { - // non-mapping - if (application.getState() != FlinkAppState.MAPPING.getValue()) { - log.error( - "FlinkRESTAPIWatcher getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint expired!"); - if (StopFrom.NONE.equals(stopFrom)) { - savePointService.expire(application.getId()); - application.setState(FlinkAppState.LOST.getValue()); - alertService.alert(application, FlinkAppState.LOST); - } else { - application.setState(FlinkAppState.CANCELED.getValue()); - } + // non-mapping + if (application.getState() != FlinkAppState.MAPPING.getValue()) { + log.error( + "FlinkRESTAPIWatcher getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint expired!"); + if (StopFrom.NONE.equals(stopFrom)) { + savePointService.expire(application.getId()); + application.setState(FlinkAppState.LOST.getValue()); + alertService.alert(application, FlinkAppState.LOST); + } else { + application.setState(FlinkAppState.CANCELED.getValue()); } - /* - This step means that the above two ways to get information have failed, and this step is the last step, - which will directly identify the mission as cancelled or lost. - Need clean savepoint. - */ - application.setEndTime(new Date()); - cleanSavepoint(application); - cleanOptioning(optionState, key); - doPersistMetrics(application, true); - FlinkAppState appState = FlinkAppState.of(application.getState()); - if (appState.equals(FlinkAppState.FAILED) || appState.equals(FlinkAppState.LOST)) { - alertService.alert(application, FlinkAppState.of(application.getState())); - if (appState.equals(FlinkAppState.FAILED)) { - try { - applicationService.start(application, true); - } catch (Exception e) { - log.error(e.getMessage(), e); - } + } + /* + This step means that the above two ways to get information have failed, and this step is the last step, + which will directly identify the mission as cancelled or lost. + Need clean savepoint. + */ + application.setEndTime(new Date()); + cleanSavepoint(application); + OptionState optionState = OPTIONING.get(key); + cleanOptioning(optionState, key); + doPersistMetrics(application, true); + FlinkAppState appState = FlinkAppState.of(application.getState()); + if (appState.equals(FlinkAppState.FAILED) || appState.equals(FlinkAppState.LOST)) { + alertService.alert(application, FlinkAppState.of(application.getState())); + if (appState.equals(FlinkAppState.FAILED)) { + try { + applicationService.start(application, true); + } catch (Exception e) { + log.error(e.getMessage(), e); } } } @@ -738,9 +736,6 @@ public class FlinkRESTAPIWatcher { private <T> T yarnRestRequest(String url, Class<T> clazz) throws IOException { String result = YarnUtils.restRequest(url); - if (null == result) { - return null; - } return JacksonUtils.read(result, clazz); } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 58dc7f82b..4088d96cc 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -207,28 +207,22 @@ trait FlinkClientTrait extends Logger { jobGraphFunc(submitRequest, flinkConfig, jarFile) } match { case Failure(e) => - logWarn( - s"""\n - |[flink-submit] JobGraph Submit Plan failed, error detail: - |------------------------------------------------------------------ - |${Utils.stringifyException(e)} - |------------------------------------------------------------------ - |Now retry submit with RestAPI Plan ... - |""".stripMargin - ) Try(restApiFunc(submitRequest, flinkConfig, jarFile)) match { case Success(r) => r - case Failure(e) => - logError( + case Failure(e1) => + throw new RuntimeException( s"""\n - |[flink-submit] RestAPI Submit failed, error detail: + |[flink-submit] Both JobGraph submit plan and Rest API submit plan all failed! + |JobGraph submit plan failed detail: |------------------------------------------------------------------ |${Utils.stringifyException(e)} |------------------------------------------------------------------ - |Both JobGraph submit plan and Rest API submit plan all failed! - |""".stripMargin - ) - throw e + | + | RestAPI Submit failed, error detail: + | ------------------------------------------------------------------ + |${Utils.stringifyException(e1)} + |------------------------------------------------------------------ + |""".stripMargin) } case Success(v) => v }
