This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9c6b9ab Use javaOptsArray provided in task context (#12326)
9c6b9ab is described below
commit 9c6b9abcdef1b0e995142f9819345ea270f79f67
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Mar 28 16:33:40 2022 +0530
Use javaOptsArray provided in task context (#12326)
The `javaOpts` property is being read from task context but not
`javaOptsArray`.
Changes:
- Read `javaOptsArray` from task context in `ForkingTaskRunner`.
- Add test to verify that `javaOptsArray` in task context takes precedence
over `javaOpts`
---
.../druid/indexing/overlord/ForkingTaskRunner.java | 19 +++
.../indexing/overlord/ForkingTaskRunnerTest.java | 136 +++++++++++++++++++++
2 files changed, 155 insertions(+)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index f9c36ef..c35e10c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.overlord;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CharMatcher;
@@ -219,6 +220,24 @@ public class ForkingTaskRunner
);
}
+ // Override task specific javaOptsArray
+ try {
+ List<String> taskJavaOptsArray =
jsonMapper.convertValue(
+
task.getContextValue(ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY),
+ new TypeReference<List<String>>() {}
+ );
+ if (taskJavaOptsArray != null) {
+ Iterables.addAll(command, taskJavaOptsArray);
+ }
+ }
+ catch (Exception e) {
+ throw new IllegalArgumentException(
+ ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY
+ + " in context of task: " + task.getId() + "
must be an array of strings.",
+ e
+ );
+ }
+
for (String propName : props.stringPropertyNames()) {
for (String allowedPrefix :
config.getAllowedPrefixes()) {
// See https://github.com/apache/druid/issues/1841
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
index 5a11651..ae5c49d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.overlord;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
@@ -48,9 +49,13 @@ import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
public class ForkingTaskRunnerTest
{
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
// This tests the test to make sure the test fails when it should.
@Test(expected = AssertionError.class)
public void testPatternMatcherFailureForJavaOptions()
@@ -356,4 +361,135 @@ public class ForkingTaskRunnerTest
Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
Assert.assertEquals("task failure test", status.getErrorMsg());
}
+
+ @Test
+ public void testJavaOptsAndJavaOptsArrayOverride() throws
ExecutionException, InterruptedException,
+
JsonProcessingException
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ final String taskContent = "{\n"
+ + " \"type\" : \"noop\",\n"
+ + " \"id\" :
\"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n"
+ + " \"groupId\" :
\"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n"
+ + " \"dataSource\" : \"none\",\n"
+ + " \"runTime\" : 2500,\n"
+ + " \"isReadyTime\" : 0,\n"
+ + " \"isReadyResult\" : \"YES\",\n"
+ + " \"firehose\" : null,\n"
+ + " \"context\" : {\n"
+ + " \"druid.indexer.runner.javaOptsArray\" :
[ \"-Xmx10g\", \"-Xms10g\" ],\n"
+ + " \"druid.indexer.runner.javaOpts\" :
\"-Xmx1g -Xms1g\"\n"
+ + " }\n"
+ + "}";
+ final Task task = OBJECT_MAPPER.readValue(taskContent, NoopTask.class);
+ final AtomicInteger xmxJavaOptsIndex = new AtomicInteger(-1);
+ final AtomicInteger xmxJavaOptsArrayIndex = new AtomicInteger(-1);
+ ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+ new ForkingTaskRunnerConfig(),
+ new TaskConfig(
+ null,
+ null,
+ null,
+ null,
+ ImmutableList.of(),
+ false,
+ new Period("PT0S"),
+ new Period("PT10S"),
+ ImmutableList.of(),
+ false,
+ false,
+ TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
+ null
+ ),
+ new WorkerConfig(),
+ new Properties(),
+ new NoopTaskLogs(),
+ mapper,
+ new DruidNode("middleManager", "host", false, 8091, null, true, false),
+ new StartupLoggingConfig()
+ )
+ {
+ @Override
+ ProcessHolder runTaskProcess(List<String> command, File logFile,
TaskLocation taskLocation)
+ {
+ xmxJavaOptsIndex.set(command.indexOf("-Xmx1g"));
+ xmxJavaOptsArrayIndex.set(command.indexOf("-Xmx10g"));
+
+ return Mockito.mock(ProcessHolder.class);
+ }
+
+ @Override
+ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder,
File logFile, File reportsFile)
+ {
+ return 1;
+ }
+ };
+
+ forkingTaskRunner.run(task).get();
+ Assert.assertTrue(xmxJavaOptsArrayIndex.get() > xmxJavaOptsIndex.get());
+ Assert.assertTrue(xmxJavaOptsIndex.get() >= 0);
+ }
+
+ @Test
+ public void testInvalidTaskContextJavaOptsArray() throws
JsonProcessingException
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ final String taskContent = "{\n"
+ + " \"type\" : \"noop\",\n"
+ + " \"id\" :
\"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n"
+ + " \"groupId\" :
\"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n"
+ + " \"dataSource\" : \"none\",\n"
+ + " \"runTime\" : 2500,\n"
+ + " \"isReadyTime\" : 0,\n"
+ + " \"isReadyResult\" : \"YES\",\n"
+ + " \"firehose\" : null,\n"
+ + " \"context\" : {\n"
+ + " \"druid.indexer.runner.javaOptsArray\" :
\"not a string array\",\n"
+ + " \"druid.indexer.runner.javaOpts\" :
\"-Xmx1g -Xms1g\"\n"
+ + " }\n"
+ + "}";
+ final Task task = OBJECT_MAPPER.readValue(taskContent, NoopTask.class);
+ ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+ new ForkingTaskRunnerConfig(),
+ new TaskConfig(
+ null,
+ null,
+ null,
+ null,
+ ImmutableList.of(),
+ false,
+ new Period("PT0S"),
+ new Period("PT10S"),
+ ImmutableList.of(),
+ false,
+ false,
+ TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
+ null
+ ),
+ new WorkerConfig(),
+ new Properties(),
+ new NoopTaskLogs(),
+ mapper,
+ new DruidNode("middleManager", "host", false, 8091, null, true, false),
+ new StartupLoggingConfig()
+ )
+ {
+ @Override
+ ProcessHolder runTaskProcess(List<String> command, File logFile,
TaskLocation taskLocation)
+ {
+ return Mockito.mock(ProcessHolder.class);
+ }
+
+ @Override
+ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder,
File logFile, File reportsFile)
+ {
+ return 1;
+ }
+ };
+
+ ExecutionException e = Assert.assertThrows(ExecutionException.class, () ->
forkingTaskRunner.run(task).get());
+
Assert.assertTrue(e.getMessage().endsWith(ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY
+ + " in context of task: " +
task.getId() + " must be an array of strings.")
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]