This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8850023811b Fix error where communication failures to k8s can lead to 
stuck tasks (#17431)
8850023811b is described below

commit 8850023811b27eeb25095fa9d4ce96ede9081b52
Author: George Shiqi Wu <[email protected]>
AuthorDate: Tue Nov 5 09:58:30 2024 -0800

    Fix error where communication failures to k8s can lead to stuck tasks 
(#17431)
    
    * Fix save logs error
    
    * Update 
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
    
    Co-authored-by: Kashif Faraz <[email protected]>
    
    * make things final
    
    * fix merge conflicts
    
    ---------
    
    Co-authored-by: Kashif Faraz <[email protected]>
---
 .../k8s/overlord/KubernetesPeonLifecycle.java      | 18 ++++++-------
 .../k8s/overlord/common/KubernetesPeonClient.java  |  3 ++-
 .../k8s/overlord/KubernetesPeonLifecycleTest.java  | 31 ++++++++--------------
 .../overlord/common/KubernetesPeonClientTest.java  |  3 ++-
 4 files changed, 23 insertions(+), 32 deletions(-)

diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index eaef0cba6a1..59e1d0f88b3 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -40,6 +40,7 @@ import 
org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
 import org.apache.druid.tasklogs.TaskLogs;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
@@ -326,23 +327,20 @@ public class KubernetesPeonLifecycle
   protected void saveLogs()
   {
     try {
-      Path file = Files.createTempFile(taskId.getOriginalTaskId(), "log");
+      final Path file = Files.createTempFile(taskId.getOriginalTaskId(), 
"log");
       try {
-        startWatchingLogs();
+        final InputStream logStream;
         if (logWatch != null) {
-          FileUtils.copyInputStreamToFile(logWatch.getOutput(), file.toFile());
+          logStream = logWatch.getOutput();
         } else {
-          log.debug("Log stream not found for %s", taskId.getOriginalTaskId());
-          FileUtils.writeStringToFile(
-              file.toFile(),
-              StringUtils.format(
+          logStream = kubernetesClient.getPeonLogs(taskId).or(
+              new ByteArrayInputStream(StringUtils.format(
                   "Peon for task [%s] did not report any logs. Check k8s 
metrics and events for the pod to see what happened.",
                   taskId
-              ),
-              Charset.defaultCharset()
+              ).getBytes(StandardCharsets.UTF_8))
           );
-
         }
+        FileUtils.copyInputStreamToFile(logStream, file.toFile());
         taskLogs.pushTaskLog(taskId.getOriginalTaskId(), file.toFile());
       }
       catch (IOException e) {
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
index 63487e4e373..00cf93ba992 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
@@ -25,6 +25,7 @@ import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.dsl.LogWatch;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.java.util.common.RetryUtils;
@@ -266,7 +267,7 @@ public class KubernetesPeonClient
       );
     }
     catch (Exception e) {
-      throw new KubernetesResourceNotFoundException("K8s pod with label: 
job-name=" + jobName + " not found");
+      throw DruidException.defensive(e, "Error when looking for K8s pod with 
label: job-name=%s", jobName);
     }
   }
 
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index 59c3700b1fc..13ad4fda209 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -46,8 +46,10 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
@@ -57,7 +59,9 @@ import java.util.concurrent.atomic.AtomicReference;
 public class KubernetesPeonLifecycleTest extends EasyMockSupport
 {
   private static final String ID = "id";
+  private static final String IP = "ip";
   private static final TaskStatus SUCCESS = TaskStatus.success(ID);
+  private static final InputStream LOG_INPUT_STREAM = new 
ByteArrayInputStream("logs for task".getBytes(StandardCharsets.UTF_8));
 
   @Mock KubernetesPeonClient kubernetesClient;
   @Mock TaskLogs taskLogs;
@@ -276,7 +280,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         EasyMock.anyLong(),
         EasyMock.eq(TimeUnit.MILLISECONDS)
     )).andReturn(new JobResponse(null, PeonPhase.FAILED));
-    
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.absent());
+    
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.absent());
     
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
 
     taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
@@ -325,7 +329,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         EasyMock.anyLong(),
         EasyMock.eq(TimeUnit.MILLISECONDS)
     )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
-    
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
+    
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM));
     EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of(
         IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), 
StandardCharsets.UTF_8)
     ));
@@ -334,8 +338,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
     EasyMock.expectLastCall().once();
     stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
-    EasyMock.expectLastCall().once();
-    logWatch.close();
     EasyMock.expectLastCall();
 
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
@@ -375,7 +377,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         EasyMock.anyLong(),
         EasyMock.eq(TimeUnit.MILLISECONDS)
     )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
-    
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
+    
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM)).times(2);
     EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(
         Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), 
StandardCharsets.UTF_8))
     );
@@ -383,14 +385,10 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall();
     taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
     EasyMock.expectLastCall();
-    logWatch.close();
-    EasyMock.expectLastCall();
     stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
     EasyMock.expectLastCall().once();
     stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
     EasyMock.expectLastCall().once();
-    logWatch.close();
-    EasyMock.expectLastCall();
 
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
@@ -435,7 +433,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         EasyMock.anyLong(),
         EasyMock.eq(TimeUnit.MILLISECONDS)
     )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
-    
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.absent());
+    
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.absent());
     
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
     taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
     EasyMock.expectLastCall();
@@ -483,7 +481,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         EasyMock.anyLong(),
         EasyMock.eq(TimeUnit.MILLISECONDS)
     )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
-    
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
+    
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM));
     EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException());
     taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
     EasyMock.expectLastCall();
@@ -491,9 +489,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall().once();
     stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
     EasyMock.expectLastCall().once();
-    logWatch.close();
-    EasyMock.expectLastCall();
-
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
     replayAll();
@@ -533,7 +528,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         EasyMock.anyLong(),
         EasyMock.eq(TimeUnit.MILLISECONDS)
     )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
-    
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
+    
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM));
     EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(
         Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), 
StandardCharsets.UTF_8))
     );
@@ -543,8 +538,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall().once();
     stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
     EasyMock.expectLastCall().once();
-    logWatch.close();
-    EasyMock.expectLastCall();
 
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
@@ -576,15 +569,13 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     )).andThrow(new RuntimeException());
 
     // We should still try to push logs
-    
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
+    
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM));
     taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
     EasyMock.expectLastCall();
     stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
     EasyMock.expectLastCall().once();
     stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
     EasyMock.expectLastCall().once();
-    logWatch.close();
-    EasyMock.expectLastCall();
 
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
index fa0da14fab7..42ec881dbc5 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
@@ -30,6 +30,7 @@ import 
io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
 import io.fabric8.kubernetes.client.dsl.LogWatch;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
@@ -525,7 +526,7 @@ public class KubernetesPeonClientTest
   void 
test_getPeonPodWithRetries_withoutPod_raisesKubernetesResourceNotFoundException()
   {
     Assertions.assertThrows(
-        KubernetesResourceNotFoundException.class,
+        DruidException.class,
         () -> instance.getPeonPodWithRetries(clientApi.getClient(), new 
K8sTaskId(ID).getK8sJobName(), 1, 1),
         StringUtils.format("K8s pod with label: job-name=%s not found", ID)
     );


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to