This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c6b9ab  Use javaOptsArray provided in task context (#12326)
9c6b9ab is described below

commit 9c6b9abcdef1b0e995142f9819345ea270f79f67
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Mar 28 16:33:40 2022 +0530

    Use javaOptsArray provided in task context (#12326)
    
    The `javaOpts` property is being read from task context but not 
`javaOptsArray`.
    Changes:
    - Read `javaOptsArray` from task context in `ForkingTaskRunner`.
    - Add test to verify that `javaOptsArray` in task context takes precedence 
over `javaOpts`
---
 .../druid/indexing/overlord/ForkingTaskRunner.java |  19 +++
 .../indexing/overlord/ForkingTaskRunnerTest.java   | 136 +++++++++++++++++++++
 2 files changed, 155 insertions(+)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index f9c36ef..c35e10c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.overlord;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CharMatcher;
@@ -219,6 +220,24 @@ public class ForkingTaskRunner
                           );
                         }
 
+                        // Override task specific javaOptsArray
+                        try {
+                          List<String> taskJavaOptsArray = 
jsonMapper.convertValue(
+                              
task.getContextValue(ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY),
+                              new TypeReference<List<String>>() {}
+                          );
+                          if (taskJavaOptsArray != null) {
+                            Iterables.addAll(command, taskJavaOptsArray);
+                          }
+                        }
+                        catch (Exception e) {
+                          throw new IllegalArgumentException(
+                              ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY
+                              + " in context of task: " + task.getId() + " 
must be an array of strings.",
+                              e
+                          );
+                        }
+
                         for (String propName : props.stringPropertyNames()) {
                           for (String allowedPrefix : 
config.getAllowedPrefixes()) {
                             // See https://github.com/apache/druid/issues/1841
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
index 5a11651..ae5c49d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.overlord;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
@@ -48,9 +49,13 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class ForkingTaskRunnerTest
 {
+
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
   // This tests the test to make sure the test fails when it should.
   @Test(expected = AssertionError.class)
   public void testPatternMatcherFailureForJavaOptions()
@@ -356,4 +361,135 @@ public class ForkingTaskRunnerTest
     Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
     Assert.assertEquals("task failure test", status.getErrorMsg());
   }
+
+  @Test
+  public void testJavaOptsAndJavaOptsArrayOverride() throws 
ExecutionException, InterruptedException,
+                                                            
JsonProcessingException
+  {
+    ObjectMapper mapper = new DefaultObjectMapper();
+    final String taskContent = "{\n"
+                               + "  \"type\" : \"noop\",\n"
+                               + "  \"id\" : 
\"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n"
+                               + "  \"groupId\" : 
\"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n"
+                               + "  \"dataSource\" : \"none\",\n"
+                               + "  \"runTime\" : 2500,\n"
+                               + "  \"isReadyTime\" : 0,\n"
+                               + "  \"isReadyResult\" : \"YES\",\n"
+                               + "  \"firehose\" : null,\n"
+                               + "  \"context\" : {\n"
+                               + "    \"druid.indexer.runner.javaOptsArray\" : 
[ \"-Xmx10g\", \"-Xms10g\" ],\n"
+                               + "    \"druid.indexer.runner.javaOpts\" : 
\"-Xmx1g -Xms1g\"\n"
+                               + "  }\n"
+                               + "}";
+    final Task task = OBJECT_MAPPER.readValue(taskContent, NoopTask.class);
+    final AtomicInteger xmxJavaOptsIndex = new AtomicInteger(-1);
+    final AtomicInteger xmxJavaOptsArrayIndex = new AtomicInteger(-1);
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        new TaskConfig(
+            null,
+            null,
+            null,
+            null,
+            ImmutableList.of(),
+            false,
+            new Period("PT0S"),
+            new Period("PT10S"),
+            ImmutableList.of(),
+            false,
+            false,
+            TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
+            null
+        ),
+        new WorkerConfig(),
+        new Properties(),
+        new NoopTaskLogs(),
+        mapper,
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new StartupLoggingConfig()
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, 
TaskLocation taskLocation)
+      {
+        xmxJavaOptsIndex.set(command.indexOf("-Xmx1g"));
+        xmxJavaOptsArrayIndex.set(command.indexOf("-Xmx10g"));
+
+        return Mockito.mock(ProcessHolder.class);
+      }
+
+      @Override
+      int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, 
File logFile, File reportsFile)
+      {
+        return 1;
+      }
+    };
+
+    forkingTaskRunner.run(task).get();
+    Assert.assertTrue(xmxJavaOptsArrayIndex.get() > xmxJavaOptsIndex.get());
+    Assert.assertTrue(xmxJavaOptsIndex.get() >= 0);
+  }
+
+  @Test
+  public void testInvalidTaskContextJavaOptsArray() throws 
JsonProcessingException
+  {
+    ObjectMapper mapper = new DefaultObjectMapper();
+    final String taskContent = "{\n"
+                               + "  \"type\" : \"noop\",\n"
+                               + "  \"id\" : 
\"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n"
+                               + "  \"groupId\" : 
\"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n"
+                               + "  \"dataSource\" : \"none\",\n"
+                               + "  \"runTime\" : 2500,\n"
+                               + "  \"isReadyTime\" : 0,\n"
+                               + "  \"isReadyResult\" : \"YES\",\n"
+                               + "  \"firehose\" : null,\n"
+                               + "  \"context\" : {\n"
+                               + "    \"druid.indexer.runner.javaOptsArray\" : 
\"not a string array\",\n"
+                               + "    \"druid.indexer.runner.javaOpts\" : 
\"-Xmx1g -Xms1g\"\n"
+                               + "  }\n"
+                               + "}";
+    final Task task = OBJECT_MAPPER.readValue(taskContent, NoopTask.class);
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        new TaskConfig(
+            null,
+            null,
+            null,
+            null,
+            ImmutableList.of(),
+            false,
+            new Period("PT0S"),
+            new Period("PT10S"),
+            ImmutableList.of(),
+            false,
+            false,
+            TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
+            null
+        ),
+        new WorkerConfig(),
+        new Properties(),
+        new NoopTaskLogs(),
+        mapper,
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new StartupLoggingConfig()
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, 
TaskLocation taskLocation)
+      {
+        return Mockito.mock(ProcessHolder.class);
+      }
+
+      @Override
+      int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, 
File logFile, File reportsFile)
+      {
+        return 1;
+      }
+    };
+
+    ExecutionException e = Assert.assertThrows(ExecutionException.class, () -> 
forkingTaskRunner.run(task).get());
+    
Assert.assertTrue(e.getMessage().endsWith(ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY
+                                              + " in context of task: " + 
task.getId() + " must be an array of strings.")
+    );
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to