jihoonson commented on a change in pull request #11446:
URL: https://github.com/apache/druid/pull/11446#discussion_r672676958
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
##########
@@ -364,38 +362,19 @@ public TaskStatus call()
);
LOGGER.info("Logging task %s output to: %s",
task.getId(), logFile);
- boolean runFailed = true;
-
- final ByteSink logSink = Files.asByteSink(logFile,
FileWriteMode.APPEND);
-
- // This will block for a while. So we append the thread
information with more details
- final String priorThreadName =
Thread.currentThread().getName();
-
Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName,
task.getId()));
-
- try (final OutputStream toLogfile =
logSink.openStream()) {
-
ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
- final int statusCode = processHolder.process.waitFor();
- LOGGER.info("Process exited with status[%d] for task:
%s", statusCode, task.getId());
- if (statusCode == 0) {
- runFailed = false;
- }
- }
- finally {
- Thread.currentThread().setName(priorThreadName);
- // Upload task logs
- taskLogPusher.pushTaskLog(task.getId(), logFile);
- if (reportsFile.exists()) {
- taskLogPusher.pushTaskReports(task.getId(),
reportsFile);
- }
- }
-
- TaskStatus status;
- if (!runFailed) {
+ final int exitCode = waitForTaskProcessToComplete(task,
processHolder, logFile, reportsFile);
+ final TaskStatus status;
+ if (exitCode == 0) {
+ LOGGER.info("Process exited successfully for task:
%s", task.getId());
// Process exited successfully
status = jsonMapper.readValue(statusFile,
TaskStatus.class);
} else {
+ LOGGER.error("Process exited with code[%d] for task:
%s", exitCode, task.getId());
// Process exited unsuccessfully
- status = TaskStatus.failure(task.getId());
+ status = TaskStatus.failure(
+ task.getId(),
+ "Task execution process exited unsuccessfully. See
middleManager logs for more details."
Review comment:
It seems reasonable to include it. Unlike other exceptions thrown in
Druid, middleManager logs will probably not provide much information about the
process failure. :+1:
##########
File path:
indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
##########
@@ -145,12 +162,192 @@ public void testMaskedIterator()
"-Dsome.somepassword = secret=value",
"-Dsome.some=notasecret",
"-Dsome.otherSecret= =asfdhkj352872598====fasdlkjfa="
- ),
- "java -cp /path/to/somewhere:some-jars.jar /some===file
/asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty =
random=random " +
- "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked>
-Dsome.somepassword =<masked> -Dsome.some=notasecret
-Dsome.otherSecret=<masked>"
+ ),
+ "java -cp /path/to/somewhere:some-jars.jar /some===file
/asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty =
random=random "
+ + "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked>
-Dsome.somepassword =<masked> -Dsome.some=notasecret
-Dsome.otherSecret=<masked>"
);
StartupLoggingConfig startupLoggingConfig = new StartupLoggingConfig();
- ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(new
ForkingTaskRunnerConfig(), null, new WorkerConfig(), null, null, null, null,
startupLoggingConfig);
- Assert.assertEquals(originalAndExpectedCommand.rhs,
forkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(),
originalAndExpectedCommand.lhs));
+ ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+ new ForkingTaskRunnerConfig(),
+ null,
+ new WorkerConfig(),
+ null,
+ null,
+ null,
+ null,
+ startupLoggingConfig
+ );
+ Assert.assertEquals(
+ originalAndExpectedCommand.rhs,
+ forkingTaskRunner.getMaskedCommand(
+ startupLoggingConfig.getMaskProperties(),
+ originalAndExpectedCommand.lhs
+ )
+ );
+ }
+
+ @Test
+ public void testTaskStatusWhenTaskProcessFails() throws ExecutionException,
InterruptedException
+ {
+ ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+ new ForkingTaskRunnerConfig(),
+ new TaskConfig(
+ null,
+ null,
+ null,
+ null,
+ ImmutableList.of(),
+ false,
+ new Period("PT0S"),
+ new Period("PT10S"),
+ ImmutableList.of(),
+ false,
+ false
+ ),
+ new WorkerConfig(),
+ new Properties(),
+ new NoopTaskLogs(),
+ new DefaultObjectMapper(),
+ new DruidNode("middleManager", "host", false, 8091, null, true, false),
+ new StartupLoggingConfig()
+ )
+ {
+ @Override
+ ProcessHolder runTaskProcess(List<String> command, File logFile,
TaskLocation taskLocation)
+ {
+ ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+
Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+ Mockito.doNothing().when(processHolder).shutdown();
+ return processHolder;
+ }
+
+ @Override
+ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder,
File logFile, File reportsFile)
+ {
+ // Emulate task process failure
+ return 1;
+ }
+ };
+
+ final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get();
+ Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+ Assert.assertEquals(
+ "Task execution process exited unsuccessfully. See middleManager logs
for more details.",
Review comment:
Oops thanks :+1:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]