This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9b88b78ba4 Fix race condition in KubernetesTaskRunner when task is
added to the map (#14643)
9b88b78ba4 is described below
commit 9b88b78ba44e3de64dfa9978e95e3ee3cda221b6
Author: YongGang <[email protected]>
AuthorDate: Thu Jul 27 00:04:36 2023 -0700
Fix race condition in KubernetesTaskRunner when task is added to the map
(#14643)
Changes:
- Fix race condition in KubernetesTaskRunner introduced by #14435
- Perform addition and removal from map inside a synchronized block
- Update tests
---
.../druid/k8s/overlord/KubernetesTaskRunner.java | 61 ++++++++++++----------
.../k8s/overlord/KubernetesTaskRunnerTest.java | 9 ++--
2 files changed, 36 insertions(+), 34 deletions(-)
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 30bea70416..12c5bb6029 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -134,16 +134,18 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
@Override
public ListenableFuture<TaskStatus> run(Task task)
{
- return tasks.computeIfAbsent(
- task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() ->
runTask(task)))
- ).getResult();
+ synchronized (tasks) {
+ return tasks.computeIfAbsent(task.getId(), k -> new
KubernetesWorkItem(task, exec.submit(() -> runTask(task))))
+ .getResult();
+ }
}
protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
- return tasks.computeIfAbsent(
- task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() ->
joinTask(task)))
- ).getResult();
+ synchronized (tasks) {
+ return tasks.computeIfAbsent(task.getId(), k -> new
KubernetesWorkItem(task, exec.submit(() -> joinTask(task))))
+ .getResult();
+ }
}
private TaskStatus runTask(Task task)
@@ -159,21 +161,23 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
@VisibleForTesting
protected TaskStatus doTask(Task task, boolean run)
{
- KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);
+ try {
+ KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);
- KubernetesWorkItem workItem = tasks.get(task.getId());
+ synchronized (tasks) {
+ KubernetesWorkItem workItem = tasks.get(task.getId());
- if (workItem == null) {
- throw new ISE("Task [%s] disappeared", task.getId());
- }
+ if (workItem == null) {
+ throw new ISE("Task [%s] disappeared", task.getId());
+ }
- if (workItem.isShutdownRequested()) {
- throw new ISE("Task [%s] has been shut down", task.getId());
- }
+ if (workItem.isShutdownRequested()) {
+ throw new ISE("Task [%s] has been shut down", task.getId());
+ }
- workItem.setKubernetesPeonLifecycle(peonLifecycle);
+ workItem.setKubernetesPeonLifecycle(peonLifecycle);
+ }
- try {
TaskStatus taskStatus;
if (run) {
taskStatus = peonLifecycle.run(
@@ -191,14 +195,14 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
return taskStatus;
}
-
catch (Exception e) {
log.error(e, "Task [%s] execution caught an exception", task.getId());
throw new RuntimeException(e);
}
-
finally {
- tasks.remove(task.getId());
+ synchronized (tasks) {
+ tasks.remove(task.getId());
+ }
}
}
@@ -269,17 +273,17 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
- List<Pair<Task, ListenableFuture<TaskStatus>>> tasks = new ArrayList<>();
+ List<Pair<Task, ListenableFuture<TaskStatus>>> restoredTasks = new
ArrayList<>();
for (Job job : client.getPeonJobs()) {
try {
Task task = adapter.toTask(job);
- tasks.add(Pair.of(task, joinAsync(task)));
+ restoredTasks.add(Pair.of(task, joinAsync(task)));
}
catch (IOException e) {
log.error(e, "Error deserializing task from job [%s]",
job.getMetadata().getName());
}
}
- return tasks;
+ return restoredTasks;
}
@Override
@@ -319,7 +323,6 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
return Lists.newArrayList(tasks.values());
}
-
@Override
public Optional<ScalingStats> getScalingStats()
{
@@ -386,18 +389,18 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
return tasks.values()
- .stream()
- .filter(KubernetesWorkItem::isRunning)
- .collect(Collectors.toList());
+ .stream()
+ .filter(KubernetesWorkItem::isRunning)
+ .collect(Collectors.toList());
}
@Override
public Collection<TaskRunnerWorkItem> getPendingTasks()
{
return tasks.values()
- .stream()
- .filter(KubernetesWorkItem::isPending)
- .collect(Collectors.toList());
+ .stream()
+ .filter(KubernetesWorkItem::isPending)
+ .collect(Collectors.toList());
}
@Nullable
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 4a3dd322a7..0ef053a619 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -32,7 +32,6 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
@@ -237,17 +236,17 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
}
@Test
- public void test_doTask_withoutWorkItem_throwsISE()
+ public void test_doTask_withoutWorkItem_throwsRuntimeException()
{
Assert.assertThrows(
"Task [id] disappeared",
- ISE.class,
+ RuntimeException.class,
() -> runner.doTask(task, true)
);
}
@Test
- public void test_doTask_whenShutdownRequested_throwsISE()
+ public void test_doTask_whenShutdownRequested_throwsRuntimeException()
{
KubernetesWorkItem workItem = new KubernetesWorkItem(task, null);
workItem.shutdown();
@@ -256,7 +255,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
Assert.assertThrows(
"Task [id] has been shut down",
- ISE.class,
+ RuntimeException.class,
() -> runner.doTask(task, true)
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]