[
https://issues.apache.org/jira/browse/FLINK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279485#comment-17279485
]
Jichao Wang edited comment on FLINK-21274 at 2/5/21, 9:07 AM:
--------------------------------------------------------------
I think FLINK-21008 is that while the JobManager is exiting, the external
device sends a forced exit signal to the JobManager. May result in
{{stopClusterServices}} and {{cleanupDirectories}} not being executed.
But my problem is that in the absence of an external interrupt signal, the task
in ioExecutor (for example: archiving job info) cannot be completely executed
during program exit.
was (Author: wjc920):
I think FLINK-21008 is that while the JobManager is exiting, the external
device sends a forced exit signal to the JobManager. {{stopClusterServices}}
and {{cleanupDirectories}} may not be executed.
But my problem is that in the absence of an external interrupt signal, the task
in ioExecutor (for example: archiving job info) cannot be completely executed
during program exit.
> At per-job mode,if the HDFS write is slow(about 5 seconds), the flink job
> archive file will upload fails
> --------------------------------------------------------------------------------------------------------
>
> Key: FLINK-21274
> URL: https://issues.apache.org/jira/browse/FLINK-21274
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.10.1
> Reporter: Jichao Wang
> Priority: Critical
> Attachments: 1.png, 2.png, Add wait 5 seconds in
> org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log, Not add wait
> 5 seconds.log, application_1612404624605_0010-JobManager.log
>
>
> This is a partial configuration of my Flink History service(flink-conf.yaml),
> and this is also the configuration of my Flink client.
> {code:java}
> jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> {code}
> I used {color:#0747a6}flink run -m yarn-cluster
> /cloud/service/flink/examples/batch/WordCount.jar{color} to submit a
> WorkCount task to the Yarn cluster. Under normal circumstances, after the
> task is completed, the flink job execution information will be archived to
> HDFS, and then the JobManager process will exit. However, when this archiving
> process takes a long time (maybe the HDFS write speed is slow), the task
> archive file upload fails.
> The specific reproduction method is as follows:
> Modify the
> {color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
> method to wait 5 seconds before actually writing to HDFS (simulating a slow
> write speed scenario).
> {code:java}
> public static Path archiveJob(Path rootPath, JobID jobId,
> Collection<ArchivedJson> jsonToArchive)
> throws IOException {
> try {
> FileSystem fs = rootPath.getFileSystem();
> Path path = new Path(rootPath, jobId.toString());
> OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
> try {
> LOG.info("===========================Wait 5 seconds..");
> Thread.sleep(5000);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> try (JsonGenerator gen = jacksonFactory.createGenerator(out,
> JsonEncoding.UTF8)) {
> ... // Part of the code is omitted here
> } catch (Exception e) {
> fs.delete(path, false);
> throw e;
> }
> LOG.info("Job {} has been archived at {}.", jobId, path);
> return path;
> } catch (IOException e) {
> LOG.error("Failed to archive job.", e);
> throw e;
> }
> }
> {code}
> After I make the above changes to the code, I cannot find the corresponding
> task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png).
> Then I went to Yarn to browse the JobManager log (see attachment
> application_1612404624605_0010-JobManager.log for log details), and found
> that the following logs are missing in the task log:
> {code:java}
> INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process
> YarnJobClusterEntrypoint with exit code 0.{code}
> Usually, if the task exits normally, a similar log will be printed before
> executing {color:#0747a6}System.exit(returnCode){color}.
> If no Exception information is found in the JobManager log, the above
> situation occurs, indicating that the JobManager is running to a certain
> point, and there is no user thread in the JobManager process, which causes
> the program to exit without completing the normal process.
> Eventually I found out that multiple services (for example: ioExecutor,
> metricRegistry, commonRpcService) were exited asynchronously in
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color},
> and multiple services would be exited in the shutdown() method of
> metricRegistry (for example : executor), these exit actions are executed
> asynchronously and in parallel. If ioExecutor or executor exits last, it will
> cause the above problems.The root cause is that the threads in these two
> Executors are daemon threads, while the threads in Akka Actor are user
> threads.
> I hope to modify the following code to fix this bug. If it is determined that
> this is a bug (this problem will affect all versions above 1.9), please
> assign the ticket to me, thank you.
> Only need to modify the
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color}
> method:
> After fixing:
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
> final String clusterEntrypointName =
> clusterEntrypoint.getClass().getSimpleName();
> try {
> clusterEntrypoint.startCluster();
> } catch (ClusterEntrypointException e) {
> LOG.error(String.format("Could not start cluster entrypoint %s.",
> clusterEntrypointName), e);
> System.exit(STARTUP_FAILURE_RETURN_CODE);
> }
> int returnCode;
> Throwable throwable = null;
> try {
> returnCode =
> clusterEntrypoint.getTerminationFuture().get().processExitCode();
> } catch (Throwable e) {
> throwable = e;
> returnCode = RUNTIME_FAILURE_RETURN_CODE;
> }
> LOG.info("Terminating cluster entrypoint process {} with exit code {}.",
> clusterEntrypointName, returnCode, throwable);
> System.exit(returnCode);
> }{code}
> Before fixing:
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
> final String clusterEntrypointName =
> clusterEntrypoint.getClass().getSimpleName();
> try {
> clusterEntrypoint.startCluster();
> } catch (ClusterEntrypointException e) {
> LOG.error(String.format("Could not start cluster entrypoint %s.",
> clusterEntrypointName), e);
> System.exit(STARTUP_FAILURE_RETURN_CODE);
> }
> clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus,
> throwable) -> {
> final int returnCode;
> if (throwable != null) {
> returnCode = RUNTIME_FAILURE_RETURN_CODE;
> } else {
> returnCode = applicationStatus.processExitCode();
> }
> LOG.info("Terminating cluster entrypoint process {} with exit code
> {}.", clusterEntrypointName, returnCode, throwable);
> System.exit(returnCode);
> });
> }
> {code}
> The purpose of the modification is to ensure that the Main thread exits last.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)