[
https://issues.apache.org/jira/browse/FLINK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279454#comment-17279454
]
Jichao Wang edited comment on FLINK-21274 at 2/5/21, 8:26 AM:
--------------------------------------------------------------
我任务我的问题和ha无关,因为我提交的任务是Flink官方提供的Example中的WordCount,并没有开启HA。
My flink job and my problem have nothing to do with ha, because the task I
submitted is the WordCount in the Example provided by Flink, and HA is not
turned on.
我想尝试进一步的解释:
I want to try further explanation:
我在 org.apache.flink.runtime.entrypoint.ClusterEntrypoint
中添加了一些打印线程信息的日志,用来获取JobManager退出前的状态,代码修改如下:
I added some logs that print thread information to
org.apache.flink.runtime.entrypoint.ClusterEntrypoint to get the status before
the JobManager exits. The code is modified as follows:
// code1
{code:java}
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
final String clusterEntrypointName =
clusterEntrypoint.getClass().getSimpleName();
try {
clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {
LOG.error(String.format("Could not start cluster entrypoint %s.",
clusterEntrypointName), e);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
// code1-1
clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus,
throwable) -> {
final int returnCode;
LOG.info("================clusterEntrypoint.getTerminationFuture
isTerminal======================================");
LOG.info("===Current thread name is: " + Thread.currentThread().getName()
+ ", isDaemon: " + Thread.currentThread().isDaemon());
printThreads();
if (throwable != null) {
returnCode = RUNTIME_FAILURE_RETURN_CODE;
} else {
returnCode = applicationStatus.processExitCode();
}
LOG.info("Terminating cluster entrypoint process {} with exit code {}.",
clusterEntrypointName, returnCode, throwable);
System.exit(returnCode);
});
}{code}
// code2
{code:java}
private void cleanupDirectories() throws IOException {
LOG.info("===================Starting
cleanupDirectories================================");
LOG.info("cleanupDirectories===Current thread name is: " +
Thread.currentThread().getName()
+ ", isDaemon: " + Thread.currentThread().isDaemon());
printThreads("cleanupDirectories");
ShutdownHookUtil.removeShutdownHook(shutDownHook,
getClass().getSimpleName(), LOG); final String webTmpDir =
configuration.getString(WebOptions.TMP_DIR);
FileUtils.deleteDirectory(new File(webTmpDir));
}
{code}
// code3
{code:java}
protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
final long shutdownTimeout =
configuration.getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT);
synchronized (lock) {
Throwable exception = null;
final Collection<CompletableFuture<Void>> terminationFutures = new
ArrayList<>(3);
if (blobServer != null) {
try {
blobServer.close();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
}
if (haServices != null) {
try {
if (cleanupHaData) {
haServices.closeAndCleanupAllData();
} else {
haServices.close();
}
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
}
if (archivedExecutionGraphStore != null) {
try {
archivedExecutionGraphStore.close();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
}
if (processMetricGroup != null) {
processMetricGroup.close();
}
if (metricRegistry != null) {
LOG.info("===metricRegistry is not null");
CompletableFuture<Void> futureMetricRegistry = metricRegistry.shutdown();
terminationFutures.add(futureMetricRegistry);
futureMetricRegistry.whenComplete((aVoid, throwable) -> {
// code3-1
LOG.info("========================metricRegistry shutdowns
successfully===================================");
LOG.info("metricRegistry===Current thread name is: " +
Thread.currentThread().getName()
+ ", isDaemon: " + Thread.currentThread().isDaemon());
printThreads("metricRegistry");
});
}
if (ioExecutor != null) {
LOG.info("===ioExecutor is not null");
CompletableFuture<Void> futureIoExecutor =
ExecutorUtils.nonBlockingShutdown(shutdownTimeout,
TimeUnit.MILLISECONDS, ioExecutor);
terminationFutures.add(futureIoExecutor);
futureIoExecutor.whenComplete((aVoid, throwable) -> {
// code3-2
LOG.info("==============ioExecutor shutdowns
successfully==========================");
LOG.info("ioExecutor===Current thread name is: " +
Thread.currentThread().getName()
+ ", isDaemon: " + Thread.currentThread().isDaemon());
printThreads("ioExecutor");
});
}
if (commonRpcService != null) {
LOG.info("===commonRpcService is not null");
CompletableFuture<Void> futureCommonRpcService =
commonRpcService.stopService();
terminationFutures.add(futureCommonRpcService);
futureCommonRpcService.whenComplete((aVoid, throwable) -> {
// code3-3
LOG.info("============================commonRpcService shutdowns
successfully=====================================");
LOG.info("commonRpcService===Current thread name is: " +
Thread.currentThread().getName()
+ ", isDaemon: " + Thread.currentThread().isDaemon());
printThreads("commonRpcService");
});
}
if (exception != null) {
terminationFutures.add(FutureUtils.completedExceptionally(exception));
}
return FutureUtils.completeAll(terminationFutures);
}{code}
// code4
{code:java}
public static void printThreads(String name) {
ThreadGroup group = Thread.currentThread().getThreadGroup();
ThreadGroup topGroup = group;
while (group != null) {
topGroup = group;
group = group.getParent();
}
int slackSize = topGroup.activeCount() * 2;
Thread[] slackThreads = new Thread[slackSize];
int actualSize = topGroup.enumerate(slackThreads);
Thread[] atualThreads = new Thread[actualSize];
System.arraycopy(slackThreads, 0, atualThreads, 0, actualSize);
LOG.info(name+"===Threads size is " + atualThreads.length);
for (Thread thread : atualThreads) {
LOG.info(name+"===Thread name : " + thread.getName()+", isDaemon: " +
thread.isDaemon());
}
}{code}
日志文件解释:
The attached log file description:
Add wait 5 seconds in
org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log
这个日志文件对应code6,在 org.apache.flink.runtime.history.FsJobArchivist#archiveJob
方法中加入了5秒等待的逻辑
This log file corresponds to code6, and a 5-second wait logic is added to the
org.apache.flink.runtime.history.FsJobArchivist#archiveJob method
Not add wait 5 seconds.log
这个日志文件对应code6, org.apache.flink.runtime.history.FsJobArchivist#archiveJob
方法中删除5秒等待的逻辑
This log file corresponds to code6, delete the logic to wait for 5 seconds in
the org.apache.flink.runtime.history.FsJobArchivist#archiveJob method
// code5
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId,
Collection<ArchivedJson> jsonToArchive)
throws IOException {
try {
FileSystem fs = rootPath.getFileSystem();
Path path = new Path(rootPath, jobId.toString());
OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
try {
LOG.info("===========================Wait 5 seconds..");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try (JsonGenerator gen = jacksonFactory.createGenerator(out,
JsonEncoding.UTF8)) {
... // Part of the code is omitted here
} catch (Exception e) {
fs.delete(path, false);
throw e;
}
LOG.info("Job {} has been archived at {}.", jobId, path);
return path;
} catch (IOException e) {
LOG.error("Failed to archive job.", e);
throw e;
}
}{code}
// code6
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId,
Collection<ArchivedJson> jsonToArchive)
throws IOException {
try {
FileSystem fs = rootPath.getFileSystem();
Path path = new Path(rootPath, jobId.toString());
OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
// There is no logic to wait for 5 seconds here
try (JsonGenerator gen = jacksonFactory.createGenerator(out,
JsonEncoding.UTF8)) {
... // Part of the code is omitted here
} catch (Exception e) {
fs.delete(path, false);
throw e;
}
LOG.info("Job {} has been archived at {}.", jobId, path);
return path;
} catch (IOException e) {
LOG.error("Failed to archive job.", e);
throw e;
}
}{code}
我认为,code1-1 和 code2 中打印的线程信息就是JobManager进程退出前的线程信息。
I think that the thread information printed in code1-1 and code2 is the thread
information before the JobManager process exits.
如果加入5秒等待,code3-2将不会被执行,也就是说ioExecutor未能正常退出。
If add a 5-second wait, code3-2 and code1-1 will not be executed, which means
that ioExecutor fails to exit normally.
code2 被 YarnJobClusterEntrypoint shutdown hook 执行。
and code2 is executed by the YarnJobClusterEntrypoint shutdown hook.
如果删除5秒等待,此时我们发现程序正常退出,code1-1 被 flink-akka.actor.default-dispatcher-16 线程执行。
与此对应的还有一种情况,如果metricRegistry 比 commonRpcService后退出,执行 code1-1 的将是
flink-metrics-scheduler-1。
If you delete the 5-second wait, we find that the program exits normally, and
code1-1 is executed by the flink-akka.actor.default-dispatcher-16 thread.
Corresponding to this, there is another situation. If metricRegistry exits
after commonRpcService, code1-1 will be executed by flink-metrics-scheduler-1.
JobManager在退出前会并行退出多个线程池,不同的线程池退出顺序导致,执行 code1-1
的线程不同。如果守护线程池(例如:ioExecutor),最后退出,code1-1将不会被执行。这就是问题的根本原因。
JobManager exits multiple thread pools in parallel before JobManager process
exiting. Different thread pool exit orders result in different threads
executing code1-1. If the thread pool is daemon pool (for example: ioExecutor)
and finally exits, code1-1 will not be executed. This is the root cause of the
problem.
如果将ioExecutor对应的类的
org.apache.flink.runtime.util.ExecutorThreadFactory#newThread 方法从code7 修改成
code8,这么做以后,即使我们在code5中等待10秒,code1-1也会得到执行。此时执行code1-1的线程将是
ForkJoinPool.commonPool-worker-57
If the org.apache.flink.runtime.util.ExecutorThreadFactory#newThread method of
the class corresponding to ioExecutor is modified from code7 to code8, after
doing so, even if we wait 10 seconds in code5, code1-1 will be executed. At
this time, the thread executing code1-1 will be
ForkJoinPool.commonPool-worker-57
// code7
{code:java}
@Override
public Thread newThread(Runnable runnable) {
Thread t = new Thread(group, runnable, namePrefix +
threadNumber.getAndIncrement());
t.setDaemon(true);
t.setPriority(threadPriority);
// optional handler for uncaught exceptions
if (exceptionHandler != null) {
t.setUncaughtExceptionHandler(exceptionHandler);
}
return t;
}{code}
// code8
{code:java}
@Override
public Thread newThread(Runnable runnable) {
Thread t = new Thread(group, runnable, namePrefix +
threadNumber.getAndIncrement());
t.setDaemon(false);
t.setPriority(threadPriority);
// optional handler for uncaught exceptions
if (exceptionHandler != null) {
t.setUncaughtExceptionHandler(exceptionHandler);
}
return t;
}{code}
What do you think?
was (Author: wjc920):
我任务我的问题和ha无关,因为我提交的任务是Flink官方提供的Example中的WordCount,并没有开启HA。
My flink job and my problem have nothing to do with ha, because the task I
submitted is the WordCount in the Example provided by Flink, and HA is not
turned on.
我想尝试进一步的解释:
I want to try further explanation:
我在 org.apache.flink.runtime.entrypoint.ClusterEntrypoint
中添加了一些打印线程信息的日志,用来获取JobManager退出前的状态,代码修改如下:
I added some logs that print thread information to
org.apache.flink.runtime.entrypoint.ClusterEntrypoint to get the status before
the JobManager exits. The code is modified as follows:
// code1
{code:java}
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
final String clusterEntrypointName =
clusterEntrypoint.getClass().getSimpleName();
try {
clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {
LOG.error(String.format("Could not start cluster entrypoint %s.",
clusterEntrypointName), e);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
// code1-1
clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus,
throwable) -> {
final int returnCode;
LOG.info("================clusterEntrypoint.getTerminationFuture
isTerminal======================================");
LOG.info("===Current thread name is: " + Thread.currentThread().getName()
+ ", isDaemon: " + Thread.currentThread().isDaemon());
printThreads();
if (throwable != null) {
returnCode = RUNTIME_FAILURE_RETURN_CODE;
} else {
returnCode = applicationStatus.processExitCode();
}
LOG.info("Terminating cluster entrypoint process {} with exit code {}.",
clusterEntrypointName, returnCode, throwable);
System.exit(returnCode);
});
}{code}
// code2
private void cleanupDirectories() throws IOException
{ LOG.info("===================Starting
cleanupDirectories================================");
LOG.info("cleanupDirectories===Current thread name is: " +
Thread.currentThread().getName() + ", isDaemon: " +
Thread.currentThread().isDaemon()); printThreads("cleanupDirectories");
ShutdownHookUtil.removeShutdownHook(shutDownHook, getClass().getSimpleName(),
LOG); final String webTmpDir = configuration.getString(WebOptions.TMP_DIR);
FileUtils.deleteDirectory(new File(webTmpDir)); }
// code3
{code:java}
protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
final long shutdownTimeout =
configuration.getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT);
synchronized (lock) {
Throwable exception = null;
final Collection<CompletableFuture<Void>> terminationFutures = new
ArrayList<>(3);
if (blobServer != null) {
try {
blobServer.close();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
}
if (haServices != null) {
try {
if (cleanupHaData) {
haServices.closeAndCleanupAllData();
} else {
haServices.close();
}
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
}
if (archivedExecutionGraphStore != null) {
try {
archivedExecutionGraphStore.close();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
}
if (processMetricGroup != null) {
processMetricGroup.close();
}
if (metricRegistry != null) {
LOG.info("===metricRegistry is not null");
CompletableFuture<Void> futureMetricRegistry = metricRegistry.shutdown();
terminationFutures.add(futureMetricRegistry);
futureMetricRegistry.whenComplete((aVoid, throwable) -> {
// code3-1
LOG.info("========================metricRegistry shutdowns
successfully===================================");
LOG.info("metricRegistry===Current thread name is: " +
Thread.currentThread().getName()
+ ", isDaemon: " + Thread.currentThread().isDaemon());
printThreads("metricRegistry");
});
}
if (ioExecutor != null) {
LOG.info("===ioExecutor is not null");
CompletableFuture<Void> futureIoExecutor =
ExecutorUtils.nonBlockingShutdown(shutdownTimeout,
TimeUnit.MILLISECONDS, ioExecutor);
terminationFutures.add(futureIoExecutor);
futureIoExecutor.whenComplete((aVoid, throwable) -> {
// code3-2
LOG.info("==============ioExecutor shutdowns
successfully==========================");
LOG.info("ioExecutor===Current thread name is: " +
Thread.currentThread().getName()
+ ", isDaemon: " + Thread.currentThread().isDaemon());
printThreads("ioExecutor");
});
}
if (commonRpcService != null) {
LOG.info("===commonRpcService is not null");
CompletableFuture<Void> futureCommonRpcService =
commonRpcService.stopService();
terminationFutures.add(futureCommonRpcService);
futureCommonRpcService.whenComplete((aVoid, throwable) -> {
// code3-3
LOG.info("============================commonRpcService shutdowns
successfully=====================================");
LOG.info("commonRpcService===Current thread name is: " +
Thread.currentThread().getName()
+ ", isDaemon: " + Thread.currentThread().isDaemon());
printThreads("commonRpcService");
});
}
if (exception != null) {
terminationFutures.add(FutureUtils.completedExceptionally(exception));
}
return FutureUtils.completeAll(terminationFutures);
}{code}
// code4
{code:java}
public static void printThreads(String name) {
ThreadGroup group = Thread.currentThread().getThreadGroup();
ThreadGroup topGroup = group;
while (group != null) {
topGroup = group;
group = group.getParent();
}
int slackSize = topGroup.activeCount() * 2;
Thread[] slackThreads = new Thread[slackSize];
int actualSize = topGroup.enumerate(slackThreads);
Thread[] atualThreads = new Thread[actualSize];
System.arraycopy(slackThreads, 0, atualThreads, 0, actualSize);
LOG.info(name+"===Threads size is " + atualThreads.length);
for (Thread thread : atualThreads) {
LOG.info(name+"===Thread name : " + thread.getName()+", isDaemon: " +
thread.isDaemon());
}
}{code}
日志文件解释:
The attached log file description:
Add wait 5 seconds in
org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log
这个日志文件对应code6,在 org.apache.flink.runtime.history.FsJobArchivist#archiveJob
方法中加入了5秒等待的逻辑
This log file corresponds to code6, and a 5-second wait logic is added to the
org.apache.flink.runtime.history.FsJobArchivist#archiveJob method
Not add wait 5 seconds.log
这个日志文件对应code6, org.apache.flink.runtime.history.FsJobArchivist#archiveJob
方法中删除5秒等待的逻辑
This log file corresponds to code6, delete the logic to wait for 5 seconds in
the org.apache.flink.runtime.history.FsJobArchivist#archiveJob method
// code5
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId,
Collection<ArchivedJson> jsonToArchive)
throws IOException {
try {
FileSystem fs = rootPath.getFileSystem();
Path path = new Path(rootPath, jobId.toString());
OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
try {
LOG.info("===========================Wait 5 seconds..");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try (JsonGenerator gen = jacksonFactory.createGenerator(out,
JsonEncoding.UTF8)) {
... // Part of the code is omitted here
} catch (Exception e) {
fs.delete(path, false);
throw e;
}
LOG.info("Job {} has been archived at {}.", jobId, path);
return path;
} catch (IOException e) {
LOG.error("Failed to archive job.", e);
throw e;
}
}{code}
// code6
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId,
Collection<ArchivedJson> jsonToArchive)
throws IOException {
try {
FileSystem fs = rootPath.getFileSystem();
Path path = new Path(rootPath, jobId.toString());
OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
// There is no logic to wait for 5 seconds here
try (JsonGenerator gen = jacksonFactory.createGenerator(out,
JsonEncoding.UTF8)) {
... // Part of the code is omitted here
} catch (Exception e) {
fs.delete(path, false);
throw e;
}
LOG.info("Job {} has been archived at {}.", jobId, path);
return path;
} catch (IOException e) {
LOG.error("Failed to archive job.", e);
throw e;
}
}{code}
我认为,code1-1 和 code2 中打印的线程信息就是JobManager进程退出前的线程信息。
I think that the thread information printed in code1-1 and code2 is the thread
information before the JobManager process exits.
如果加入5秒等待,code3-2将不会被执行,也就是说ioExecutor未能正常退出。
If add a 5-second wait, code3-2 and code1-1 will not be executed, which means
that ioExecutor fails to exit normally.
code2 被 YarnJobClusterEntrypoint shutdown hook 执行。
and code2 is executed by the YarnJobClusterEntrypoint shutdown hook.
如果删除5秒等待,此时我们发现程序正常退出,code1-1 被 flink-akka.actor.default-dispatcher-16 线程执行。
与此对应的还有一种情况,如果metricRegistry 比 commonRpcService后退出,执行 code1-1 的将是
flink-metrics-scheduler-1。
If you delete the 5-second wait, we find that the program exits normally, and
code1-1 is executed by the flink-akka.actor.default-dispatcher-16 thread.
Corresponding to this, there is another situation. If metricRegistry exits
after commonRpcService, code1-1 will be executed by flink-metrics-scheduler-1.
JobManager在退出前会并行退出多个线程池,不同的线程池退出顺序导致,执行 code1-1
的线程不同。如果守护线程池(例如:ioExecutor),最后退出,code1-1将不会被执行。这就是问题的根本原因。
JobManager exits multiple thread pools in parallel before JobManager process
exiting. Different thread pool exit orders result in different threads
executing code1-1. If the thread pool is daemon pool (for example: ioExecutor)
and finally exits, code1-1 will not be executed. This is the root cause of the
problem.
如果将ioExecutor对应的类的
org.apache.flink.runtime.util.ExecutorThreadFactory#newThread 方法从code7 修改成
code8,这么做以后,即使我们在code5中等待10秒,code1-1也会得到执行。此时执行code1-1的线程将是
ForkJoinPool.commonPool-worker-57
If the org.apache.flink.runtime.util.ExecutorThreadFactory#newThread method of
the class corresponding to ioExecutor is modified from code7 to code8, after
doing so, even if we wait 10 seconds in code5, code1-1 will be executed. At
this time, the thread executing code1-1 will be
ForkJoinPool.commonPool-worker-57
// code7
{code:java}
@Override
public Thread newThread(Runnable runnable) {
Thread t = new Thread(group, runnable, namePrefix +
threadNumber.getAndIncrement());
t.setDaemon(true);
t.setPriority(threadPriority);
// optional handler for uncaught exceptions
if (exceptionHandler != null) {
t.setUncaughtExceptionHandler(exceptionHandler);
}
return t;
}{code}
// code8
{code:java}
@Override
public Thread newThread(Runnable runnable) {
Thread t = new Thread(group, runnable, namePrefix +
threadNumber.getAndIncrement());
t.setDaemon(false);
t.setPriority(threadPriority);
// optional handler for uncaught exceptions
if (exceptionHandler != null) {
t.setUncaughtExceptionHandler(exceptionHandler);
}
return t;
}{code}
What do you think?
> At per-job mode,if the HDFS write is slow(about 5 seconds), the flink job
> archive file will upload fails
> --------------------------------------------------------------------------------------------------------
>
> Key: FLINK-21274
> URL: https://issues.apache.org/jira/browse/FLINK-21274
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.10.1
> Reporter: Jichao Wang
> Priority: Critical
> Attachments: 1.png, 2.png,
> application_1612404624605_0010-JobManager.log
>
>
> This is a partial configuration of my Flink History service(flink-conf.yaml),
> and this is also the configuration of my Flink client.
> {code:java}
> jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> {code}
> I used {color:#0747a6}flink run -m yarn-cluster
> /cloud/service/flink/examples/batch/WordCount.jar{color} to submit a
> WorkCount task to the Yarn cluster. Under normal circumstances, after the
> task is completed, the flink job execution information will be archived to
> HDFS, and then the JobManager process will exit. However, when this archiving
> process takes a long time (maybe the HDFS write speed is slow), the task
> archive file upload fails.
> The specific reproduction method is as follows:
> Modify the
> {color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
> method to wait 5 seconds before actually writing to HDFS (simulating a slow
> write speed scenario).
> {code:java}
> public static Path archiveJob(Path rootPath, JobID jobId,
> Collection<ArchivedJson> jsonToArchive)
> throws IOException {
> try {
> FileSystem fs = rootPath.getFileSystem();
> Path path = new Path(rootPath, jobId.toString());
> OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
> try {
> LOG.info("===========================Wait 5 seconds..");
> Thread.sleep(5000);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> try (JsonGenerator gen = jacksonFactory.createGenerator(out,
> JsonEncoding.UTF8)) {
> ... // Part of the code is omitted here
> } catch (Exception e) {
> fs.delete(path, false);
> throw e;
> }
> LOG.info("Job {} has been archived at {}.", jobId, path);
> return path;
> } catch (IOException e) {
> LOG.error("Failed to archive job.", e);
> throw e;
> }
> }
> {code}
> After I make the above changes to the code, I cannot find the corresponding
> task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png).
> Then I went to Yarn to browse the JobManager log (see attachment
> application_1612404624605_0010-JobManager.log for log details), and found
> that the following logs are missing in the task log:
> {code:java}
> INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process
> YarnJobClusterEntrypoint with exit code 0.{code}
> Usually, if the task exits normally, a similar log will be printed before
> executing {color:#0747a6}System.exit(returnCode){color}.
> If no Exception information is found in the JobManager log, the above
> situation occurs, indicating that the JobManager is running to a certain
> point, and there is no user thread in the JobManager process, which causes
> the program to exit without completing the normal process.
> Eventually I found out that multiple services (for example: ioExecutor,
> metricRegistry, commonRpcService) were exited asynchronously in
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color},
> and multiple services would be exited in the shutdown() method of
> metricRegistry (for example : executor), these exit actions are executed
> asynchronously and in parallel. If ioExecutor or executor exits last, it will
> cause the above problems.The root cause is that the threads in these two
> Executors are daemon threads, while the threads in Akka Actor are user
> threads.
> I hope to modify the following code to fix this bug. If it is determined that
> this is a bug (this problem will affect all versions above 1.9), please
> assign the ticket to me, thank you.
> Only need to modify the
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color}
> method:
> After fixing:
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
> final String clusterEntrypointName =
> clusterEntrypoint.getClass().getSimpleName();
> try {
> clusterEntrypoint.startCluster();
> } catch (ClusterEntrypointException e) {
> LOG.error(String.format("Could not start cluster entrypoint %s.",
> clusterEntrypointName), e);
> System.exit(STARTUP_FAILURE_RETURN_CODE);
> }
> int returnCode;
> Throwable throwable = null;
> try {
> returnCode =
> clusterEntrypoint.getTerminationFuture().get().processExitCode();
> } catch (Throwable e) {
> throwable = e;
> returnCode = RUNTIME_FAILURE_RETURN_CODE;
> }
> LOG.info("Terminating cluster entrypoint process {} with exit code {}.",
> clusterEntrypointName, returnCode, throwable);
> System.exit(returnCode);
> }{code}
> Before fixing:
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
> final String clusterEntrypointName =
> clusterEntrypoint.getClass().getSimpleName();
> try {
> clusterEntrypoint.startCluster();
> } catch (ClusterEntrypointException e) {
> LOG.error(String.format("Could not start cluster entrypoint %s.",
> clusterEntrypointName), e);
> System.exit(STARTUP_FAILURE_RETURN_CODE);
> }
> clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus,
> throwable) -> {
> final int returnCode;
> if (throwable != null) {
> returnCode = RUNTIME_FAILURE_RETURN_CODE;
> } else {
> returnCode = applicationStatus.processExitCode();
> }
> LOG.info("Terminating cluster entrypoint process {} with exit code
> {}.", clusterEntrypointName, returnCode, throwable);
> System.exit(returnCode);
> });
> }
> {code}
> The purpose of the modification is to ensure that the Main thread exits last.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)