This is an automated email from the ASF dual-hosted git repository.
davidlim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 95b0de61d1 Move some lifecycle management from doTask -> shutdown for
the mm-less task runner (#14895)
95b0de61d1 is described below
commit 95b0de61d172a323aead25571b8b83b4ae0b7120
Author: George Shiqi Wu <[email protected]>
AuthorDate: Fri Aug 25 12:50:38 2023 -0400
Move some lifecycle management from doTask -> shutdown for the mm-less task
runner (#14895)
* save work
* Add syncronized
* Don't shutdown in run
* Adding unit tests
* Cleanup lifecycle
* Fix tests
* remove newline
---
.../k8s/overlord/KubernetesPeonLifecycle.java | 9 ++--
.../druid/k8s/overlord/KubernetesTaskRunner.java | 24 +++++----
.../druid/k8s/overlord/KubernetesWorkItem.java | 9 ----
.../k8s/overlord/KubernetesPeonLifecycleTest.java | 14 ++---
.../k8s/overlord/KubernetesTaskRunnerTest.java | 62 +++++++++++-----------
.../druid/k8s/overlord/KubernetesWorkItemTest.java | 2 -
6 files changed, 52 insertions(+), 68 deletions(-)
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index f6b15f46bc..4814d8cbb6 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -137,7 +137,6 @@ public class KubernetesPeonLifecycle
}
catch (Exception e) {
log.info("Failed to run task: %s", taskId.getOriginalTaskId());
- shutdown();
throw e;
}
finally {
@@ -168,10 +167,9 @@ public class KubernetesPeonLifecycle
finally {
try {
saveLogs();
- shutdown();
}
catch (Exception e) {
- log.warn(e, "Task [%s] cleanup failed", taskId);
+ log.warn(e, "Log processing failed for task [%s]", taskId);
}
stopTask();
@@ -188,7 +186,7 @@ public class KubernetesPeonLifecycle
*/
protected void shutdown()
{
- if (State.PENDING.equals(state.get()) ||
State.RUNNING.equals(state.get())) {
+ if (State.PENDING.equals(state.get()) || State.RUNNING.equals(state.get())
|| State.STOPPED.equals(state.get())) {
kubernetesClient.deletePeonJob(taskId);
}
}
@@ -223,7 +221,7 @@ public class KubernetesPeonLifecycle
*/
protected TaskLocation getTaskLocation()
{
- if (!State.RUNNING.equals(state.get())) {
+ if (State.PENDING.equals(state.get()) ||
State.NOT_STARTED.equals(state.get())) {
log.debug("Can't get task location for non-running job. [%s]",
taskId.getOriginalTaskId());
return TaskLocation.unknown();
}
@@ -251,7 +249,6 @@ public class KubernetesPeonLifecycle
Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED,
"false")),
pod.getMetadata() != null ? pod.getMetadata().getName() : ""
);
- log.info("K8s task %s is running at location %s",
taskId.getOriginalTaskId(), taskLocation);
}
return taskLocation;
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 9a4e4bcca6..24e21b0b4e 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -182,10 +182,6 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
KubernetesWorkItem workItem = tasks.get(task.getId());
if (workItem == null) {
- throw new ISE("Task [%s] disappeared", task.getId());
- }
-
- if (workItem.isShutdownRequested()) {
throw new ISE("Task [%s] has been shut down", task.getId());
}
@@ -213,11 +209,6 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
log.error(e, "Task [%s] execution caught an exception", task.getId());
throw new RuntimeException(e);
}
- finally {
- synchronized (tasks) {
- tasks.remove(task.getId());
- }
- }
}
@VisibleForTesting
@@ -271,6 +262,10 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
return;
}
+ synchronized (tasks) {
+ tasks.remove(taskid);
+ }
+
workItem.shutdown();
}
@@ -440,6 +435,17 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
.collect(Collectors.toList());
}
+ @Override
+ public TaskLocation getTaskLocation(String taskId)
+ {
+ final KubernetesWorkItem workItem = tasks.get(taskId);
+ if (workItem == null) {
+ return TaskLocation.unknown();
+ } else {
+ return workItem.getLocation();
+ }
+ }
+
@Nullable
@Override
public RunnerTaskState getRunnerTaskState(String taskId)
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
index 164a82b6ae..94d4bbb67f 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
@@ -30,13 +30,10 @@ import
org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.java.util.common.ISE;
import java.io.InputStream;
-import java.util.concurrent.atomic.AtomicBoolean;
public class KubernetesWorkItem extends TaskRunnerWorkItem
{
private final Task task;
-
- private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
private KubernetesPeonLifecycle kubernetesPeonLifecycle = null;
public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus>
statusFuture)
@@ -53,7 +50,6 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
protected synchronized void shutdown()
{
- this.shutdownRequested.set(true);
if (this.kubernetesPeonLifecycle != null) {
this.kubernetesPeonLifecycle.startWatchingLogs();
@@ -61,11 +57,6 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
}
}
- protected boolean isShutdownRequested()
- {
- return shutdownRequested.get();
- }
-
protected boolean isPending()
{
return RunnerTaskState.PENDING.equals(getRunnerTaskState());
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index 3a017e5f74..084d1db62d 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -198,9 +198,6 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(null);
- EasyMock.expect(kubernetesClient.deletePeonJob(
- new K8sTaskId(ID)
- )).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
EasyMock.expectLastCall().once();
@@ -245,7 +242,6 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
replayAll();
@@ -298,7 +294,6 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
@@ -353,7 +348,6 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
@@ -408,7 +402,6 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
@@ -459,7 +452,6 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
@@ -512,7 +504,6 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
@@ -554,8 +545,6 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
logWatch.close();
EasyMock.expectLastCall();
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
-
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
replayAll();
@@ -908,8 +897,11 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
stateListener
);
setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.STOPPED);
+
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()).once();
+ replayAll();
Assert.assertEquals(TaskLocation.unknown(),
peonLifecycle.getTaskLocation());
+ verifyAll();
}
private void setPeonLifecycleState(KubernetesPeonLifecycle peonLifecycle,
KubernetesPeonLifecycle.State state)
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index ca1fc64171..bee3a533c7 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -152,8 +152,6 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
Assert.assertEquals(taskStatus, future.get());
verifyAll();
-
- Assert.assertFalse(runner.tasks.containsKey(task.getId()));
}
@Test
@@ -191,8 +189,6 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
Assert.assertTrue(e.getCause() instanceof RuntimeException);
verifyAll();
-
- Assert.assertFalse(runner.tasks.containsKey(task.getId()));
}
@Test
@@ -208,8 +204,6 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
Assert.assertEquals(taskStatus, future.get());
verifyAll();
-
- Assert.assertFalse(runner.tasks.containsKey(task.getId()));
}
@Test
@@ -236,28 +230,11 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
Assert.assertTrue(e.getCause() instanceof RuntimeException);
verifyAll();
-
- Assert.assertFalse(runner.tasks.containsKey(task.getId()));
- }
-
- @Test
- public void test_doTask_withoutWorkItem_throwsRuntimeException()
- {
- Assert.assertThrows(
- "Task [id] disappeared",
- RuntimeException.class,
- () -> runner.doTask(task, true)
- );
}
@Test
public void test_doTask_whenShutdownRequested_throwsRuntimeException()
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null);
- workItem.shutdown();
-
- runner.tasks.put(task.getId(), workItem);
-
Assert.assertThrows(
"Task [id] has been shut down",
RuntimeException.class,
@@ -266,13 +243,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
}
@Test
- public void test_shutdown_withoutExistingTask()
- {
- runner.shutdown(task.getId(), "");
- }
-
- @Test
- public void test_shutdown_withExistingTask()
+ public void test_shutdown_withExistingTask_removesTaskFromMap()
{
KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
@Override
@@ -282,7 +253,13 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
};
runner.tasks.put(task.getId(), workItem);
+ runner.shutdown(task.getId(), "");
+ Assert.assertTrue(runner.tasks.isEmpty());
+ }
+ @Test
+ public void test_shutdown_withoutExistingTask()
+ {
runner.shutdown(task.getId(), "");
}
@@ -629,6 +606,30 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
verifyAll();
}
+ @Test
+ public void test_getTaskLocation_withExistingTask()
+ {
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+ @Override
+ public TaskLocation getLocation()
+ {
+ return TaskLocation.create("host", 0, 1, false);
+ }
+ };
+
+ runner.tasks.put(task.getId(), workItem);
+
+ TaskLocation taskLocation = runner.getTaskLocation(task.getId());
+ Assert.assertEquals(TaskLocation.create("host", 0, 1, false),
taskLocation);
+ }
+
+ @Test
+ public void test_getTaskLocation_noTaskFound()
+ {
+ TaskLocation taskLocation = runner.getTaskLocation(task.getId());
+ Assert.assertEquals(TaskLocation.unknown(), taskLocation);
+ }
+
@Test
public void test_getTotalCapacity()
{
@@ -644,6 +645,5 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
Assert.assertEquals(1, runner.getUsedCapacity());
runner.tasks.remove(task.getId());
Assert.assertEquals(0, runner.getUsedCapacity());
-
}
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
index 5f95177048..d5cf2ea725 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
@@ -75,7 +75,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
public void test_shutdown_withoutKubernetesPeonLifecycle()
{
workItem.shutdown();
- Assert.assertTrue(workItem.isShutdownRequested());
}
@Test
@@ -91,7 +90,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
workItem.shutdown();
verifyAll();
- Assert.assertTrue(workItem.isShutdownRequested());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]