[
https://issues.apache.org/jira/browse/FLINK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279454#comment-17279454
]
Jichao Wang edited comment on FLINK-21274 at 2/5/21, 11:28 AM:
---------------------------------------------------------------
I want to try further explanation:
I added some logs that print thread information to
{color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint{color} to
get the status before the JobManager exits. The code is modified as follows:
// code1
{code:java}
public static void runClusterEntrypoint(ClusterEntrypoint
clusterEntrypoint) {
final String clusterEntrypointName =
clusterEntrypoint.getClass().getSimpleName();
try {
clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {
LOG.error(String.format("Could not start cluster
entrypoint %s.", clusterEntrypointName), e);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
// code1-1
clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus,
throwable) -> {
final int returnCode;
LOG.info("================clusterEntrypoint.getTerminationFuture
isTerminal======================================");
LOG.info("===Current thread name is: " +
Thread.currentThread().getName()
+ ", isDaemon: " +
Thread.currentThread().isDaemon());
printThreads();
if (throwable != null) {
returnCode = RUNTIME_FAILURE_RETURN_CODE;
} else {
returnCode =
applicationStatus.processExitCode();
}
LOG.info("Terminating cluster entrypoint process {}
with exit code {}.", clusterEntrypointName, returnCode, throwable);
System.exit(returnCode);
});
}
{code}
// code2
{code:java}
private void cleanupDirectories() throws IOException {
LOG.info("===================Starting
cleanupDirectories================================");
LOG.info("cleanupDirectories===Current thread name is: " +
Thread.currentThread().getName()
+ ", isDaemon: " + Thread.currentThread().isDaemon());
printThreads("cleanupDirectories");
ShutdownHookUtil.removeShutdownHook(shutDownHook,
getClass().getSimpleName(), LOG);
final String webTmpDir =
configuration.getString(WebOptions.TMP_DIR);
FileUtils.deleteDirectory(new File(webTmpDir));
}
{code}
// code3
{code:java}
protected CompletableFuture<Void> stopClusterServices(boolean
cleanupHaData) {
final long shutdownTimeout =
configuration.getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT);
synchronized (lock) {
Throwable exception = null;
final Collection<CompletableFuture<Void>>
terminationFutures = new ArrayList<>(3);
if (blobServer != null) {
try {
blobServer.close();
} catch (Throwable t) {
exception =
ExceptionUtils.firstOrSuppressed(t, exception);
}
}
if (haServices != null) {
try {
if (cleanupHaData) {
haServices.closeAndCleanupAllData();
} else {
haServices.close();
}
} catch (Throwable t) {
exception =
ExceptionUtils.firstOrSuppressed(t, exception);
}
}
if (archivedExecutionGraphStore != null) {
try {
archivedExecutionGraphStore.close();
} catch (Throwable t) {
exception =
ExceptionUtils.firstOrSuppressed(t, exception);
}
}
if (processMetricGroup != null) {
processMetricGroup.close();
}
if (metricRegistry != null) {
LOG.info("===metricRegistry is not null");
CompletableFuture<Void> futureMetricRegistry =
metricRegistry.shutdown();
terminationFutures.add(futureMetricRegistry);
futureMetricRegistry.whenComplete((aVoid,
throwable) -> {
// code3-1
LOG.info("========================metricRegistry shutdowns
successfully===================================");
LOG.info("metricRegistry===Current
thread name is: " + Thread.currentThread().getName()
+ ", isDaemon: " +
Thread.currentThread().isDaemon());
printThreads("metricRegistry");
});
}
if (ioExecutor != null) {
LOG.info("===ioExecutor is not null");
CompletableFuture<Void> futureIoExecutor =
ExecutorUtils.nonBlockingShutdown(shutdownTimeout,
TimeUnit.MILLISECONDS, ioExecutor);
terminationFutures.add(futureIoExecutor);
futureIoExecutor.whenComplete((aVoid,
throwable) -> {
// code3-2
LOG.info("==============ioExecutor
shutdowns successfully==========================");
LOG.info("ioExecutor===Current thread
name is: " + Thread.currentThread().getName()
+ ", isDaemon: " +
Thread.currentThread().isDaemon());
printThreads("ioExecutor");
});
}
if (commonRpcService != null) {
LOG.info("===commonRpcService is not null");
CompletableFuture<Void> futureCommonRpcService
= commonRpcService.stopService();
terminationFutures.add(futureCommonRpcService);
futureCommonRpcService.whenComplete((aVoid,
throwable) -> {
// code3-3
LOG.info("============================commonRpcService shutdowns
successfully=====================================");
LOG.info("commonRpcService===Current
thread name is: " + Thread.currentThread().getName()
+ ", isDaemon: " +
Thread.currentThread().isDaemon());
printThreads("commonRpcService");
});
}
if (exception != null) {
terminationFutures.add(FutureUtils.completedExceptionally(exception));
}
return FutureUtils.completeAll(terminationFutures);
}
{code}
// code4
{code:java}
public static void printThreads() {
ThreadGroup group = Thread.currentThread().getThreadGroup();
ThreadGroup topGroup = group;
while (group != null) {
topGroup = group;
group = group.getParent();
}
int slackSize = topGroup.activeCount() * 2;
Thread[] slackThreads = new Thread[slackSize];
int actualSize = topGroup.enumerate(slackThreads);
Thread[] atualThreads = new Thread[actualSize];
System.arraycopy(slackThreads, 0, atualThreads, 0, actualSize);
LOG.info("===Threads size is " + atualThreads.length);
for (Thread thread : atualThreads) {
LOG.info("===Thread name : " + thread.getName()+",
isDaemon: " + thread.isDaemon());
}
}
{code}
The attached log file description:
[^Add wait 5 seconds in
org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log]This log file
corresponds to {color:#0747a6}code6{color}, and a 5-second wait logic is added
to the
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
method
[^Not add wait 5 seconds.log]This log file corresponds to
{color:#0747a6}code6{color}, delete the logic to wait for 5 seconds in the
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
method
// code5
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId,
Collection<ArchivedJson> jsonToArchive)
throws IOException {
try {
FileSystem fs = rootPath.getFileSystem();
Path path = new Path(rootPath, jobId.toString());
OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
try {
LOG.info("===========================Wait 5 seconds..");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try (JsonGenerator gen = jacksonFactory.createGenerator(out,
JsonEncoding.UTF8)) {
... // Part of the code is omitted here
} catch (Exception e) {
fs.delete(path, false);
throw e;
}
LOG.info("Job {} has been archived at {}.", jobId, path);
return path;
} catch (IOException e) {
LOG.error("Failed to archive job.", e);
throw e;
}
}
{code}
// code6
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId,
Collection<ArchivedJson> jsonToArchive)
throws IOException {
try {
FileSystem fs = rootPath.getFileSystem();
Path path = new Path(rootPath, jobId.toString());
OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
// 此处没有5秒等待的逻辑
try (JsonGenerator gen = jacksonFactory.createGenerator(out,
JsonEncoding.UTF8)) {
... // Part of the code is omitted here
} catch (Exception e) {
fs.delete(path, false);
throw e;
}
LOG.info("Job {} has been archived at {}.", jobId, path);
return path;
} catch (IOException e) {
LOG.error("Failed to archive job.", e);
throw e;
}
}
{code}
I think that the thread information printed in {color:#0747a6}code1-1{color}
and {color:#0747a6}code2{color} is the thread information before the JobManager
process exits.
If add a 5-second wait, {color:#0747a6}code3-2{color} and{color:#0747a6}
code1-1{color} will not be executed, which means that ioExecutor fails to exit
normally. And {color:#0747a6}code2{color} is executed by the
YarnJobClusterEntrypoint shutdown hook. Why does this happen? I think we need
to think about it carefully.
If you delete the 5-second wait, we find that the program exits normally, and
{color:#0747a6}code1-1{color} is executed by the
{color:#0747a6}flink-akka.actor.default-dispatcher-16{color} thread.
Corresponding to this, there is another situation. If metricRegistry exits
after commonRpcService,{color:#0747a6} code1-1{color} will be executed by
{color:#0747a6}flink-metrics-scheduler-1{color}.
JobManager exits multiple thread pools in parallel before JobManager process
exiting. Different thread pool exit orders result in different threads
executing {color:#0747a6}code1-1{color}. If the thread pool is daemon pool (for
example: ioExecutor) and finally exits, {color:#0747a6}code1-1{color} will not
be executed. This is the root cause of the problem.
If the
{color:#0747a6}org.apache.flink.runtime.util.ExecutorThreadFactory#newThread{color}
method corresponding to ioExecutor is modified from
{color:#0747a6}code7{color} to {color:#0747a6}code8{color}, after doing so,
even if we wait 10 seconds in {color:#0747a6}code5{color},
{color:#0747a6}code1-1{color} will be executed. At this time, the thread
executing code1-1 will be
{color:#0747a6}ForkJoinPool.commonPool-worker-57{color}
// code7
{code:java}
@Override
public Thread newThread(Runnable runnable) {
Thread t = new Thread(group, runnable, namePrefix +
threadNumber.getAndIncrement());
t.setDaemon(true);
t.setPriority(threadPriority);
// optional handler for uncaught exceptions
if (exceptionHandler != null) {
t.setUncaughtExceptionHandler(exceptionHandler);
}
return t;
}
{code}
// code8
{code:java}
@Override
public Thread newThread(Runnable runnable) {
Thread t = new Thread(group, runnable, namePrefix +
threadNumber.getAndIncrement());
t.setDaemon(false);
t.setPriority(threadPriority);
// optional handler for uncaught exceptions
if (exceptionHandler != null) {
t.setUncaughtExceptionHandler(exceptionHandler);
}
return t;
}
{code}
I think {color:#0747a6}code1-1{color} needs to be executed by MainThread,
otherwise if the daemon thread pool finally exits,
{color:#0747a6}code1-1{color} will not be executed.
[~fly_in_gis] What do you think?
was (Author: wjc920):
I want to try further explanation:
I added some logs that print thread information to
{color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint{color} to
get the status before the JobManager exits. The code is modified as follows:
// code1
{code:java}
public static void runClusterEntrypoint(ClusterEntrypoint
clusterEntrypoint) {
final String clusterEntrypointName =
clusterEntrypoint.getClass().getSimpleName();
try {
clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {
LOG.error(String.format("Could not start cluster
entrypoint %s.", clusterEntrypointName), e);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
// code1-1
clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus,
throwable) -> {
final int returnCode;
LOG.info("================clusterEntrypoint.getTerminationFuture
isTerminal======================================");
LOG.info("===Current thread name is: " +
Thread.currentThread().getName()
+ ", isDaemon: " +
Thread.currentThread().isDaemon());
printThreads();
if (throwable != null) {
returnCode = RUNTIME_FAILURE_RETURN_CODE;
} else {
returnCode =
applicationStatus.processExitCode();
}
LOG.info("Terminating cluster entrypoint process {}
with exit code {}.", clusterEntrypointName, returnCode, throwable);
System.exit(returnCode);
});
}
{code}
// code2
{code:java}
private void cleanupDirectories() throws IOException {
LOG.info("===================Starting
cleanupDirectories================================");
LOG.info("cleanupDirectories===Current thread name is: " +
Thread.currentThread().getName()
+ ", isDaemon: " + Thread.currentThread().isDaemon());
printThreads("cleanupDirectories");
ShutdownHookUtil.removeShutdownHook(shutDownHook,
getClass().getSimpleName(), LOG);
final String webTmpDir =
configuration.getString(WebOptions.TMP_DIR);
FileUtils.deleteDirectory(new File(webTmpDir));
}
{code}
// code3
{code:java}
protected CompletableFuture<Void> stopClusterServices(boolean
cleanupHaData) {
final long shutdownTimeout =
configuration.getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT);
synchronized (lock) {
Throwable exception = null;
final Collection<CompletableFuture<Void>>
terminationFutures = new ArrayList<>(3);
if (blobServer != null) {
try {
blobServer.close();
} catch (Throwable t) {
exception =
ExceptionUtils.firstOrSuppressed(t, exception);
}
}
if (haServices != null) {
try {
if (cleanupHaData) {
haServices.closeAndCleanupAllData();
} else {
haServices.close();
}
} catch (Throwable t) {
exception =
ExceptionUtils.firstOrSuppressed(t, exception);
}
}
if (archivedExecutionGraphStore != null) {
try {
archivedExecutionGraphStore.close();
} catch (Throwable t) {
exception =
ExceptionUtils.firstOrSuppressed(t, exception);
}
}
if (processMetricGroup != null) {
processMetricGroup.close();
}
if (metricRegistry != null) {
LOG.info("===metricRegistry is not null");
CompletableFuture<Void> futureMetricRegistry =
metricRegistry.shutdown();
terminationFutures.add(futureMetricRegistry);
futureMetricRegistry.whenComplete((aVoid,
throwable) -> {
// code3-1
LOG.info("========================metricRegistry shutdowns
successfully===================================");
LOG.info("metricRegistry===Current
thread name is: " + Thread.currentThread().getName()
+ ", isDaemon: " +
Thread.currentThread().isDaemon());
printThreads("metricRegistry");
});
}
if (ioExecutor != null) {
LOG.info("===ioExecutor is not null");
CompletableFuture<Void> futureIoExecutor =
ExecutorUtils.nonBlockingShutdown(shutdownTimeout,
TimeUnit.MILLISECONDS, ioExecutor);
terminationFutures.add(futureIoExecutor);
futureIoExecutor.whenComplete((aVoid,
throwable) -> {
// code3-2
LOG.info("==============ioExecutor
shutdowns successfully==========================");
LOG.info("ioExecutor===Current thread
name is: " + Thread.currentThread().getName()
+ ", isDaemon: " +
Thread.currentThread().isDaemon());
printThreads("ioExecutor");
});
}
if (commonRpcService != null) {
LOG.info("===commonRpcService is not null");
CompletableFuture<Void> futureCommonRpcService
= commonRpcService.stopService();
terminationFutures.add(futureCommonRpcService);
futureCommonRpcService.whenComplete((aVoid,
throwable) -> {
// code3-3
LOG.info("============================commonRpcService shutdowns
successfully=====================================");
LOG.info("commonRpcService===Current
thread name is: " + Thread.currentThread().getName()
+ ", isDaemon: " +
Thread.currentThread().isDaemon());
printThreads("commonRpcService");
});
}
if (exception != null) {
terminationFutures.add(FutureUtils.completedExceptionally(exception));
}
return FutureUtils.completeAll(terminationFutures);
}
{code}
// code4
{code:java}
public static void printThreads(String name) {
ThreadGroup group = Thread.currentThread().getThreadGroup();
ThreadGroup topGroup = group;
while (group != null) {
topGroup = group;
group = group.getParent();
}
int slackSize = topGroup.activeCount() * 2;
Thread[] slackThreads = new Thread[slackSize];
int actualSize = topGroup.enumerate(slackThreads);
Thread[] atualThreads = new Thread[actualSize];
System.arraycopy(slackThreads, 0, atualThreads, 0, actualSize);
LOG.info(name+"===Threads size is " + atualThreads.length);
for (Thread thread : atualThreads) {
LOG.info(name+"===Thread name : " + thread.getName()+",
isDaemon: " + thread.isDaemon());
}
}
{code}
The attached log file description:
[^Add wait 5 seconds in
org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log]This log file
corresponds to {color:#0747a6}code6{color}, and a 5-second wait logic is added
to the
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
method
[^Not add wait 5 seconds.log]This log file corresponds to
{color:#0747a6}code6{color}, delete the logic to wait for 5 seconds in the
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
method
// code5
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId,
Collection<ArchivedJson> jsonToArchive)
throws IOException {
try {
FileSystem fs = rootPath.getFileSystem();
Path path = new Path(rootPath, jobId.toString());
OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
try {
LOG.info("===========================Wait 5 seconds..");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try (JsonGenerator gen = jacksonFactory.createGenerator(out,
JsonEncoding.UTF8)) {
... // Part of the code is omitted here
} catch (Exception e) {
fs.delete(path, false);
throw e;
}
LOG.info("Job {} has been archived at {}.", jobId, path);
return path;
} catch (IOException e) {
LOG.error("Failed to archive job.", e);
throw e;
}
}
{code}
// code6
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId,
Collection<ArchivedJson> jsonToArchive)
throws IOException {
try {
FileSystem fs = rootPath.getFileSystem();
Path path = new Path(rootPath, jobId.toString());
OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
// 此处没有5秒等待的逻辑
try (JsonGenerator gen = jacksonFactory.createGenerator(out,
JsonEncoding.UTF8)) {
... // Part of the code is omitted here
} catch (Exception e) {
fs.delete(path, false);
throw e;
}
LOG.info("Job {} has been archived at {}.", jobId, path);
return path;
} catch (IOException e) {
LOG.error("Failed to archive job.", e);
throw e;
}
}
{code}
I think that the thread information printed in {color:#0747a6}code1-1{color}
and {color:#0747a6}code2{color} is the thread information before the JobManager
process exits.
If add a 5-second wait, {color:#0747a6}code3-2{color} and{color:#0747a6}
code1-1{color} will not be executed, which means that ioExecutor fails to exit
normally. And {color:#0747a6}code2{color} is executed by the
YarnJobClusterEntrypoint shutdown hook. Why does this happen? I think we need
to think about it carefully.
If you delete the 5-second wait, we find that the program exits normally, and
{color:#0747a6}code1-1{color} is executed by the
{color:#0747a6}flink-akka.actor.default-dispatcher-16{color} thread.
Corresponding to this, there is another situation. If metricRegistry exits
after commonRpcService,{color:#0747a6} code1-1{color} will be executed by
{color:#0747a6}flink-metrics-scheduler-1{color}.
JobManager exits multiple thread pools in parallel before JobManager process
exiting. Different thread pool exit orders result in different threads
executing {color:#0747a6}code1-1{color}. If the thread pool is daemon pool (for
example: ioExecutor) and finally exits, {color:#0747a6}code1-1{color} will not
be executed. This is the root cause of the problem.
If the
{color:#0747a6}org.apache.flink.runtime.util.ExecutorThreadFactory#newThread{color}
method corresponding to ioExecutor is modified from
{color:#0747a6}code7{color} to {color:#0747a6}code8{color}, after doing so,
even if we wait 10 seconds in {color:#0747a6}code5{color},
{color:#0747a6}code1-1{color} will be executed. At this time, the thread
executing code1-1 will be
{color:#0747a6}ForkJoinPool.commonPool-worker-57{color}
// code7
{code:java}
@Override
public Thread newThread(Runnable runnable) {
Thread t = new Thread(group, runnable, namePrefix +
threadNumber.getAndIncrement());
t.setDaemon(true);
t.setPriority(threadPriority);
// optional handler for uncaught exceptions
if (exceptionHandler != null) {
t.setUncaughtExceptionHandler(exceptionHandler);
}
return t;
}
{code}
// code8
{code:java}
@Override
public Thread newThread(Runnable runnable) {
Thread t = new Thread(group, runnable, namePrefix +
threadNumber.getAndIncrement());
t.setDaemon(false);
t.setPriority(threadPriority);
// optional handler for uncaught exceptions
if (exceptionHandler != null) {
t.setUncaughtExceptionHandler(exceptionHandler);
}
return t;
}
{code}
I think {color:#0747a6}code1-1{color} needs to be executed by MainThread,
otherwise if the daemon thread pool finally exits,
{color:#0747a6}code1-1{color} will not be executed.
[~fly_in_gis] What do you think?
> At per-job mode,if the HDFS write is slow(about 5 seconds), the flink job
> archive file will upload fails
> --------------------------------------------------------------------------------------------------------
>
> Key: FLINK-21274
> URL: https://issues.apache.org/jira/browse/FLINK-21274
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.10.1
> Reporter: Jichao Wang
> Priority: Critical
> Attachments: 1.png, 2.png, Add wait 5 seconds in
> org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log, Not add wait
> 5 seconds.log, application_1612404624605_0010-JobManager.log
>
>
> This is a partial configuration of my Flink History service(flink-conf.yaml),
> and this is also the configuration of my Flink client.
> {code:java}
> jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> {code}
> I used {color:#0747a6}flink run -m yarn-cluster
> /cloud/service/flink/examples/batch/WordCount.jar{color} to submit a
> WorkCount task to the Yarn cluster. Under normal circumstances, after the
> task is completed, the flink job execution information will be archived to
> HDFS, and then the JobManager process will exit. However, when this archiving
> process takes a long time (maybe the HDFS write speed is slow), the task
> archive file upload fails.
> The specific reproduction method is as follows:
> Modify the
> {color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
> method to wait 5 seconds before actually writing to HDFS (simulating a slow
> write speed scenario).
> {code:java}
> public static Path archiveJob(Path rootPath, JobID jobId,
> Collection<ArchivedJson> jsonToArchive)
> throws IOException {
> try {
> FileSystem fs = rootPath.getFileSystem();
> Path path = new Path(rootPath, jobId.toString());
> OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
> try {
> LOG.info("===========================Wait 5 seconds..");
> Thread.sleep(5000);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> try (JsonGenerator gen = jacksonFactory.createGenerator(out,
> JsonEncoding.UTF8)) {
> ... // Part of the code is omitted here
> } catch (Exception e) {
> fs.delete(path, false);
> throw e;
> }
> LOG.info("Job {} has been archived at {}.", jobId, path);
> return path;
> } catch (IOException e) {
> LOG.error("Failed to archive job.", e);
> throw e;
> }
> }
> {code}
> After I make the above changes to the code, I cannot find the corresponding
> task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png).
> Then I went to Yarn to browse the JobManager log (see attachment
> application_1612404624605_0010-JobManager.log for log details), and found
> that the following logs are missing in the task log:
> {code:java}
> INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process
> YarnJobClusterEntrypoint with exit code 0.{code}
> Usually, if the task exits normally, a similar log will be printed before
> executing {color:#0747a6}System.exit(returnCode){color}.
> If no Exception information is found in the JobManager log, the above
> situation occurs, indicating that the JobManager is running to a certain
> point, and there is no user thread in the JobManager process, which causes
> the program to exit without completing the normal process.
> Eventually I found out that multiple services (for example: ioExecutor,
> metricRegistry, commonRpcService) were exited asynchronously in
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color},
> and multiple services would be exited in the shutdown() method of
> metricRegistry (for example : executor), these exit actions are executed
> asynchronously and in parallel. If ioExecutor or executor exits last, it will
> cause the above problems.The root cause is that the threads in these two
> Executors are daemon threads, while the threads in Akka Actor are user
> threads.
> I hope to modify the following code to fix this bug. If it is determined that
> this is a bug (this problem will affect all versions above 1.9), please
> assign the ticket to me, thank you.
> Only need to modify the
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color}
> method:
> After fixing:
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
> final String clusterEntrypointName =
> clusterEntrypoint.getClass().getSimpleName();
> try {
> clusterEntrypoint.startCluster();
> } catch (ClusterEntrypointException e) {
> LOG.error(String.format("Could not start cluster entrypoint %s.",
> clusterEntrypointName), e);
> System.exit(STARTUP_FAILURE_RETURN_CODE);
> }
> int returnCode;
> Throwable throwable = null;
> try {
> returnCode =
> clusterEntrypoint.getTerminationFuture().get().processExitCode();
> } catch (Throwable e) {
> throwable = e;
> returnCode = RUNTIME_FAILURE_RETURN_CODE;
> }
> LOG.info("Terminating cluster entrypoint process {} with exit code {}.",
> clusterEntrypointName, returnCode, throwable);
> System.exit(returnCode);
> }{code}
> Before fixing:
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
> final String clusterEntrypointName =
> clusterEntrypoint.getClass().getSimpleName();
> try {
> clusterEntrypoint.startCluster();
> } catch (ClusterEntrypointException e) {
> LOG.error(String.format("Could not start cluster entrypoint %s.",
> clusterEntrypointName), e);
> System.exit(STARTUP_FAILURE_RETURN_CODE);
> }
> clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus,
> throwable) -> {
> final int returnCode;
> if (throwable != null) {
> returnCode = RUNTIME_FAILURE_RETURN_CODE;
> } else {
> returnCode = applicationStatus.processExitCode();
> }
> LOG.info("Terminating cluster entrypoint process {} with exit code
> {}.", clusterEntrypointName, returnCode, throwable);
> System.exit(returnCode);
> });
> }
> {code}
> The purpose of the modification is to ensure that the Main thread exits last.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)