This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new bd07c3dd43 Don't need to double synchronize on simple map operations
(#14435)
bd07c3dd43 is described below
commit bd07c3dd43428608678e337c7935f237a64f533d
Author: George Shiqi Wu <[email protected]>
AuthorDate: Sat Jun 17 20:30:37 2023 -0400
Don't need to double synchronize on simple map operations (#14435)
* Don't need to double syncronize on simple map operations
* remove lock
---
.../druid/k8s/overlord/KubernetesTaskRunner.java | 60 +++++++++-------------
1 file changed, 24 insertions(+), 36 deletions(-)
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index fe2f4be371..30bea70416 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -134,18 +134,16 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
@Override
public ListenableFuture<TaskStatus> run(Task task)
{
- synchronized (tasks) {
- tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task,
exec.submit(() -> runTask(task))));
- return tasks.get(task.getId()).getResult();
- }
+ return tasks.computeIfAbsent(
+ task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() ->
runTask(task)))
+ ).getResult();
}
protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
- synchronized (tasks) {
- tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task,
exec.submit(() -> joinTask(task))));
- return tasks.get(task.getId()).getResult();
- }
+ return tasks.computeIfAbsent(
+ task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() ->
joinTask(task)))
+ ).getResult();
}
private TaskStatus runTask(Task task)
@@ -163,20 +161,18 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
{
KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);
- synchronized (tasks) {
- KubernetesWorkItem workItem = tasks.get(task.getId());
-
- if (workItem == null) {
- throw new ISE("Task [%s] disappeared", task.getId());
- }
+ KubernetesWorkItem workItem = tasks.get(task.getId());
- if (workItem.isShutdownRequested()) {
- throw new ISE("Task [%s] has been shut down", task.getId());
- }
+ if (workItem == null) {
+ throw new ISE("Task [%s] disappeared", task.getId());
+ }
- workItem.setKubernetesPeonLifecycle(peonLifecycle);
+ if (workItem.isShutdownRequested()) {
+ throw new ISE("Task [%s] has been shut down", task.getId());
}
+ workItem.setKubernetesPeonLifecycle(peonLifecycle);
+
try {
TaskStatus taskStatus;
if (run) {
@@ -202,9 +198,7 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
}
finally {
- synchronized (tasks) {
- tasks.remove(task.getId());
- }
+ tasks.remove(task.getId());
}
}
@@ -322,9 +316,7 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
@Override
public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
{
- synchronized (tasks) {
- return Lists.newArrayList(tasks.values());
- }
+ return Lists.newArrayList(tasks.values());
}
@@ -393,23 +385,19 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
@Override
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
- synchronized (tasks) {
- return tasks.values()
- .stream()
- .filter(KubernetesWorkItem::isRunning)
- .collect(Collectors.toList());
- }
+ return tasks.values()
+ .stream()
+ .filter(KubernetesWorkItem::isRunning)
+ .collect(Collectors.toList());
}
@Override
public Collection<TaskRunnerWorkItem> getPendingTasks()
{
- synchronized (tasks) {
- return tasks.values()
- .stream()
- .filter(KubernetesWorkItem::isPending)
- .collect(Collectors.toList());
- }
+ return tasks.values()
+ .stream()
+ .filter(KubernetesWorkItem::isPending)
+ .collect(Collectors.toList());
}
@Nullable
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]