This is an automated email from the ASF dual-hosted git repository.
georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8850023811b Fix error where communication failures to k8s can lead to
stuck tasks (#17431)
8850023811b is described below
commit 8850023811b27eeb25095fa9d4ce96ede9081b52
Author: George Shiqi Wu <[email protected]>
AuthorDate: Tue Nov 5 09:58:30 2024 -0800
Fix error where communication failures to k8s can lead to stuck tasks
(#17431)
* Fix save logs error
* Update
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
Co-authored-by: Kashif Faraz <[email protected]>
* make things final
* fix merge conflicts
---------
Co-authored-by: Kashif Faraz <[email protected]>
---
.../k8s/overlord/KubernetesPeonLifecycle.java | 18 ++++++-------
.../k8s/overlord/common/KubernetesPeonClient.java | 3 ++-
.../k8s/overlord/KubernetesPeonLifecycleTest.java | 31 ++++++++--------------
.../overlord/common/KubernetesPeonClientTest.java | 3 ++-
4 files changed, 23 insertions(+), 32 deletions(-)
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index eaef0cba6a1..59e1d0f88b3 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -40,6 +40,7 @@ import
org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.tasklogs.TaskLogs;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
@@ -326,23 +327,20 @@ public class KubernetesPeonLifecycle
protected void saveLogs()
{
try {
- Path file = Files.createTempFile(taskId.getOriginalTaskId(), "log");
+ final Path file = Files.createTempFile(taskId.getOriginalTaskId(),
"log");
try {
- startWatchingLogs();
+ final InputStream logStream;
if (logWatch != null) {
- FileUtils.copyInputStreamToFile(logWatch.getOutput(), file.toFile());
+ logStream = logWatch.getOutput();
} else {
- log.debug("Log stream not found for %s", taskId.getOriginalTaskId());
- FileUtils.writeStringToFile(
- file.toFile(),
- StringUtils.format(
+ logStream = kubernetesClient.getPeonLogs(taskId).or(
+ new ByteArrayInputStream(StringUtils.format(
"Peon for task [%s] did not report any logs. Check k8s
metrics and events for the pod to see what happened.",
taskId
- ),
- Charset.defaultCharset()
+ ).getBytes(StandardCharsets.UTF_8))
);
-
}
+ FileUtils.copyInputStreamToFile(logStream, file.toFile());
taskLogs.pushTaskLog(taskId.getOriginalTaskId(), file.toFile());
}
catch (IOException e) {
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
index 63487e4e373..00cf93ba992 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
@@ -25,6 +25,7 @@ import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.LogWatch;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.RetryUtils;
@@ -266,7 +267,7 @@ public class KubernetesPeonClient
);
}
catch (Exception e) {
- throw new KubernetesResourceNotFoundException("K8s pod with label:
job-name=" + jobName + " not found");
+ throw DruidException.defensive(e, "Error when looking for K8s pod with
label: job-name=%s", jobName);
}
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index 59c3700b1fc..13ad4fda209 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -46,8 +46,10 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
@@ -57,7 +59,9 @@ import java.util.concurrent.atomic.AtomicReference;
public class KubernetesPeonLifecycleTest extends EasyMockSupport
{
private static final String ID = "id";
+ private static final String IP = "ip";
private static final TaskStatus SUCCESS = TaskStatus.success(ID);
+ private static final InputStream LOG_INPUT_STREAM = new
ByteArrayInputStream("logs for task".getBytes(StandardCharsets.UTF_8));
@Mock KubernetesPeonClient kubernetesClient;
@Mock TaskLogs taskLogs;
@@ -276,7 +280,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(null, PeonPhase.FAILED));
-
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.absent());
+
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.absent());
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
@@ -325,7 +329,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
-
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
+
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM));
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of(
IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS),
StandardCharsets.UTF_8)
));
@@ -334,8 +338,6 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
- EasyMock.expectLastCall().once();
- logWatch.close();
EasyMock.expectLastCall();
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
@@ -375,7 +377,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
-
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
+
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM)).times(2);
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(
Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS),
StandardCharsets.UTF_8))
);
@@ -383,14 +385,10 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.expectLastCall();
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
- logWatch.close();
- EasyMock.expectLastCall();
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
- logWatch.close();
- EasyMock.expectLastCall();
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
@@ -435,7 +433,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
-
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.absent());
+
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.absent());
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
@@ -483,7 +481,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
-
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
+
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM));
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
@@ -491,9 +489,6 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
- logWatch.close();
- EasyMock.expectLastCall();
-
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
replayAll();
@@ -533,7 +528,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
-
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
+
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM));
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(
Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS),
StandardCharsets.UTF_8))
);
@@ -543,8 +538,6 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
- logWatch.close();
- EasyMock.expectLastCall();
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
@@ -576,15 +569,13 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
)).andThrow(new RuntimeException());
// We should still try to push logs
-
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
+
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM));
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
- logWatch.close();
- EasyMock.expectLastCall();
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
index fa0da14fab7..42ec881dbc5 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
@@ -30,6 +30,7 @@ import
io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
@@ -525,7 +526,7 @@ public class KubernetesPeonClientTest
void
test_getPeonPodWithRetries_withoutPod_raisesKubernetesResourceNotFoundException()
{
Assertions.assertThrows(
- KubernetesResourceNotFoundException.class,
+ DruidException.class,
() -> instance.getPeonPodWithRetries(clientApi.getClient(), new
K8sTaskId(ID).getK8sJobName(), 1, 1),
StringUtils.format("K8s pod with label: job-name=%s not found", ID)
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]