This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 613bd023bd65d5e74fc0fc33ea82b8e3263ae5f3 Author: Alex Heneveld <[email protected]> AuthorDate: Thu Aug 11 22:23:00 2022 +0100 faster strategies for determining container readiness and completion and destroy namespace can be done asynchronously because jobs/pods update only several seconds after the container is finished (using docker desktop) --- .../brooklyn/tasks/kubectl/ContainerCommons.java | 6 +- .../tasks/kubectl/ContainerTaskFactory.java | 456 ++++++++++++++------- .../brooklyn/tasks/kubectl/ContainerTaskTest.java | 9 +- 3 files changed, 328 insertions(+), 143 deletions(-) diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java index d426724e38..130c25b963 100644 --- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java +++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java @@ -56,10 +56,12 @@ public interface ContainerCommons { String NAMESPACE_CREATE_CMD = "kubectl create namespace %s"; String NAMESPACE_SET_CMD = "kubectl config set-context --current --namespace=%s"; String JOBS_CREATE_CMD = "kubectl apply -f %s --namespace=%s"; - String JOBS_FEED_CMD = "kubectl wait --timeout=%ds --for=condition=complete job/%s --namespace=%s"; - String JOBS_FEED_FAILED_CMD = "kubectl wait --timeout=%ds --for=condition=failed job/%s --namespace=%s"; + String JOBS_WAIT_COMPLETE_CMD = "kubectl wait --timeout=%ds --for=condition=complete job/%s --namespace=%s"; + String JOBS_WAIT_FAILED_CMD = "kubectl wait --timeout=%ds --for=condition=failed job/%s --namespace=%s"; String JOBS_LOGS_CMD = "kubectl logs jobs/%s --namespace=%s"; + String JOBS_DELETE_CMD = "kubectl delete job %s --namespace=%s"; String PODS_CMD_PREFIX = "kubectl get pods --namespace=%s --selector=job-name=%s "; + String PODS_STATUS_STATE_CMD = PODS_CMD_PREFIX + "-ojsonpath='{.items[0].status.containerStatuses[0].state}'"; String PODS_STATUS_PHASE_CMD = PODS_CMD_PREFIX + "-ojsonpath='{.items[0].status.phase}'"; String PODS_NAME_CMD = PODS_CMD_PREFIX + "-ojsonpath='{.items[0].metadata.name}'"; String PODS_EXIT_CODE_CMD = PODS_CMD_PREFIX + "-ojsonpath='{.items[0].status.containerStatuses[0].state.terminated.exitCode}'"; diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java index a78c1cb6de..f1b8caf255 100644 --- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java +++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java @@ -19,6 +19,7 @@ package org.apache.brooklyn.tasks.kubectl; import com.google.common.collect.Lists; +import com.google.gson.Gson; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.core.entity.BrooklynConfigKeys; @@ -52,12 +53,14 @@ import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -71,6 +74,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp protected String jobIdentifier = ""; protected final ConfigBag config = ConfigBag.newInstance(); private String namespace; + private boolean namespaceRandom = false; private Boolean createNamespace; private Boolean deleteNamespace; Function<ContainerTaskResult,RET> returnConversion; @@ -179,146 +183,28 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp final CountdownTimer timer = CountdownTimer.newInstanceStarted(timeout); // wait for it to be running (or failed / succeeded) - - PodPhases podPhase = DynamicTasks.queue(Tasks.<PodPhases>builder().dynamic(true).displayName("Wait for container to be running (or fail)").body(() -> { - String phase = null; - long first = System.currentTimeMillis(); - long last = first; - long backoffMillis = 10; - while (timer.isNotExpired()) { - phase = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_PHASE_CMD, namespace, kubeJobName)).summary("Get pod phase").allowingNonZeroExitCode().newTask()).get().trim(); - if (PodPhases.Running.name().equalsIgnoreCase(phase)) return PodPhases.Running; - if (PodPhases.Failed.name().equalsIgnoreCase(phase)) return PodPhases.Failed; - if (PodPhases.Succeeded.name().equalsIgnoreCase(phase)) return PodPhases.Succeeded; - - if (Strings.isNonBlank(phase) && Strings.isBlank(result.kubePodName)) { - result.kubePodName = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_NAME_CMD, namespace, kubeJobName)).summary("Get pod name").allowingNonZeroExitCode().newTask()).get().trim(); - } - if (PodPhases.Pending.name().equals(phase) && Strings.isNonBlank(result.kubePodName)) { - // if pending, look for errors - String failedEvents = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_FAILED_JSON_CMD, namespace, result.kubePodName)).summary("Get pod failed events").allowingNonZeroExitCode().newTask()).get().trim(); - if (!"[]".equals(failedEvents)) { - String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask()).get().trim(); - throw new IllegalStateException("Job pod failed: "+failedEvents+"\n"+events); - } - } - - if (System.currentTimeMillis() - last > 10*1000) { - last = System.currentTimeMillis(); - // every 10s log info - LOG.info("Container taking long time to start ("+Duration.millis(last-first)+"): "+namespace+" "+kubeJobName+" "+result.kubePodName+" / phase '"+phase+"'"); - if (Strings.isNonBlank(result.kubePodName)) { - String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask()).get().trim(); - LOG.info("Pod events: \n"+events); - } else { - String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, kubeJobName)).summary("Get job events").allowingNonZeroExitCode().newTask()).get().trim(); - LOG.info("Job events: \n"+events); - } - } - long backoffMillis2 = backoffMillis; - Tasks.withBlockingDetails("waiting "+backoffMillis2+"ms for pod to be available (current status '" + phase + "')", () -> { - Time.sleep(backoffMillis2); - return null; - }); - if (backoffMillis<80) backoffMillis*=2; - } - throw new IllegalStateException("Timeout waiting for pod to be available; current status is '" + phase + "'"); - }).build()).getUnchecked(); + PodPhases phaseOnceActive = waitForContainerAvailable(kubeJobName, result, timer); +// waitForContainerPodContainerState(kubeJobName, result, timer); // notify once pod is available - synchronized (result) { - result.notifyAll(); - } - - // use `wait --for` api, but in a 5s loop in case there are other issues - boolean succeeded = podPhase == PodPhases.Succeeded ? true : podPhase == PodPhases.Failed ? false : DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> { - while (true) { - LOG.debug("Container job submitted, now waiting on success or failure"); - - long secondsLeft = Math.min(Math.max(1, timer.getDurationRemaining().toSeconds()), 5); - final AtomicInteger finishCount = new AtomicInteger(0); - - ProcessTaskWrapper<String> waitForSuccess = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_FEED_CMD, secondsLeft, kubeJobName, namespace)) - .summary("Wait for success ('complete')").allowingNonZeroExitCode().newTask()); - Entities.submit(entity, Tasks.create("Wait for success then notify", () -> { - try { - if (waitForSuccess.get().contains("condition met")) - LOG.debug("Container job " + namespace + " detected as completed (succeeded) in kubernetes"); - } finally { - synchronized (finishCount) { - finishCount.incrementAndGet(); - finishCount.notifyAll(); - } - } - })); - - ProcessTaskWrapper<String> waitForFailed = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_FEED_FAILED_CMD, secondsLeft, kubeJobName, namespace)) - .summary("Wait for failed").allowingNonZeroExitCode().newTask()); - Entities.submit(entity, Tasks.create("Wait for failed then notify", () -> { - try { - if (waitForFailed.get().contains("condition met")) - LOG.debug("Container job " + namespace + " detected as failed in kubernetes (may be valid non-zero exit)"); - } finally { - synchronized (finishCount) { - finishCount.incrementAndGet(); - finishCount.notifyAll(); - } - } - })); - - while (finishCount.get() == 0) { - LOG.debug("Container job " + namespace + " waiting on complete or failed"); - try { - synchronized (finishCount) { - finishCount.wait(Duration.TEN_SECONDS.toMilliseconds()); - } - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } - } - - if (waitForSuccess.isDone() && waitForSuccess.getExitCode() == 0) return true; - if (waitForFailed.isDone() && waitForFailed.getExitCode() == 0) return false; - LOG.debug("Container job " + namespace + " not yet complete, will retry"); + synchronized (result) { result.notifyAll(); } - // other one-off checks for job error, we could do here - // e.g. if image can't be pulled, for instance + boolean succeeded = PodPhases.Succeeded == phaseOnceActive || + (PodPhases.Failed != phaseOnceActive && + //use `wait --for` api, but in a 5s loop in case there are other issues +// waitForContainerCompletedUsingK8sWaitFor(stdout, kubeJobName, entity, timer) + waitForContainerCompletedUsingPodState(stdout, kubeJobName, entity, timer) + ); - // finally get the partial log for reporting - ProcessTaskWrapper<String> outputSoFarCmd = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output so far").allowingNonZeroExitCode().newTask()); - BrooklynTaskTags.setTransient(outputSoFarCmd.asTask()); - outputSoFarCmd.block(); - if (outputSoFarCmd.getExitCode()!=0) { - throw new IllegalStateException("Error detected with container job while reading logs (exit code "+outputSoFarCmd.getExitCode()+"): "+outputSoFarCmd.getStdout() + " / "+outputSoFarCmd.getStderr()); - } - String outputSoFar = outputSoFarCmd.get(); - int bytesAlreadyRead = stdout.size(); - if (bytesAlreadyRead <= outputSoFar.length()) { - String newOutput = outputSoFar.substring(stdout.size()); - LOG.debug("Container job " + namespace + " output: " + newOutput); - stdout.write(newOutput.getBytes(StandardCharsets.UTF_8)); - } else { - // not sure why this happens, but it does sometimes; for now just reset - LOG.debug("Container job " + namespace + " output reset, length "+outputSoFar.length()+" less than "+bytesAlreadyRead+"; ignoring new output:\n" + outputSoFar +"\n"+new String(stdout.toByteArray())); - stdout.reset(); - stdout.write(outputSoFar.getBytes(StandardCharsets.UTF_8)); - } - - if (timer.isExpired()) - throw new IllegalStateException("Timeout waiting for success or failure"); - - // probably timed out or job not yet available; short wait then retry - Time.sleep(Duration.millis(50)); - } - - }).build()).getUnchecked(); - LOG.debug("Container job "+namespace+" completed, success "+succeeded); + LOG.debug("Container job "+kubeJobName+" completed, success "+succeeded); ProcessTaskWrapper<String> retrieveOutput = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output").newTask()); ProcessTaskWrapper<String> retrieveExitCode = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_EXIT_CODE_CMD, namespace, kubeJobName)).summary("Retrieve exit code").newTask()); DynamicTasks.waitForLast(); result.mainStdout = retrieveOutput.get(); - stdout.write(result.mainStdout.substring(stdout.size()).getBytes(StandardCharsets.UTF_8)); + + updateStdoutWithNewData(stdout, result.mainStdout); String exitCodeS = retrieveExitCode.getStdout(); if (Strings.isNonBlank(exitCodeS)) result.mainExitCode = Integer.parseInt(exitCodeS.trim()); @@ -333,8 +219,17 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp } finally { if (deleteNamespaceHere) { - doDeleteNamespace(); + doDeleteNamespace(!namespaceRandom, true); // if a one-off job, namespace has random id in it so can safely be deleted in background (no one else risks reusing it) + } else { + Boolean devMode = EntityInitializers.resolve(config, KEEP_CONTAINER_FOR_DEBUGGING); + if (!Boolean.TRUE.equals(devMode)) { + Entities.submit(entity, newDeleteJobTask(kubeJobName) + // namespace might have been deleted in parallel so okay if we don't delete the job + .allowingNonZeroExitCode() + .newTask()).get(); + } } + DynamicTasks.waitForLast(); } } catch (Exception e) { throw Exceptions.propagate(e); @@ -346,6 +241,277 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp return taskBuilder.build(); } + private Boolean waitForContainerCompletedUsingK8sWaitFor(ByteArrayOutputStream stdout, String kubeJobName, Entity entity, CountdownTimer timer) { + return DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> { + while (true) { + LOG.debug("Container job " + kubeJobName + " submitted, now waiting on success or failure"); + + long secondsLeft = Math.min(Math.max(1, timer.getDurationRemaining().toSeconds()), 5); + Boolean x = checkForContainerCompletedUsingK8sWaitFor(kubeJobName, entity, secondsLeft); + + if (x != null) return x; + LOG.debug("Container job " + namespace + " not yet complete, will retry"); + + // other one-off checks for job error, we could do here + // e.g. if image can't be pulled, for instance + + refreshStdout(stdout, kubeJobName, timer); + + // probably timed out or job not yet available; short wait then retry + Time.sleep(Duration.millis(50)); + } + + }).build()).getUnchecked(); + } + + private Boolean waitForContainerCompletedUsingPodState(ByteArrayOutputStream stdout, String kubeJobName, Entity entity, CountdownTimer timer) { + return DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> { + long retryDelay = 10; + while (true) { + LOG.debug("Container job " + kubeJobName + " submitted, now waiting on success or failure"); + + PodPhases phase = checkPodPhase(kubeJobName); + if (phase.equals(PodPhases.Succeeded)) return true; + if (phase.equals(PodPhases.Failed)) return false; + + LOG.debug("Container job " + namespace + " not yet complete, will sleep then retry"); + + // other one-off checks for job error, we could do here + // e.g. if image can't be pulled, for instance + + refreshStdout(stdout, kubeJobName, timer); + + // probably timed out or job not yet available; short wait then retry + Time.sleep(Duration.millis(retryDelay)); + retryDelay *= 1.5; + if (retryDelay > 250) { + // max out at 500ms + retryDelay = 500; + } + } + + }).build()).getUnchecked(); + } + + private void refreshStdout(ByteArrayOutputStream stdout, String kubeJobName, CountdownTimer timer) throws IOException { + // finally get the partial log for reporting + ProcessTaskWrapper<String> outputSoFarCmd = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output so far").allowingNonZeroExitCode().newTask()); + BrooklynTaskTags.setTransient(outputSoFarCmd.asTask()); + outputSoFarCmd.block(); + if (outputSoFarCmd.getExitCode() != 0) { + throw new IllegalStateException("Error detected with container job while reading logs (exit code " + outputSoFarCmd.getExitCode() + "): " + outputSoFarCmd.getStdout() + " / " + outputSoFarCmd.getStderr()); + } + updateStdoutWithNewData(stdout, outputSoFarCmd.get()); + + if (timer.isExpired()) + throw new IllegalStateException("Timeout waiting for success or failure"); + } + + private void updateStdoutWithNewData(ByteArrayOutputStream receiverStream, String outputFound) throws IOException { + int bytesAlreadyRead = receiverStream.size(); + if (bytesAlreadyRead <= outputFound.length()) { + String newOutput = outputFound.substring(receiverStream.size()); + LOG.debug("Container job " + namespace + " output: " + newOutput); + receiverStream.write(newOutput.getBytes(StandardCharsets.UTF_8)); + } else { + // not sure why this happens, but it does sometimes; for now just reset + LOG.debug("Container job " + namespace + " output reset, length " + outputFound.length() + " less than " + bytesAlreadyRead + "; ignoring new output:\n" + outputFound + "\n" + new String(receiverStream.toByteArray())); + receiverStream.reset(); + receiverStream.write(outputFound.getBytes(StandardCharsets.UTF_8)); + } + } + + private Boolean checkForContainerCompletedUsingK8sWaitFor(String kubeJobName, Entity entity, long timeoutSeconds) { + final AtomicInteger finishCount = new AtomicInteger(0); + + ProcessTaskWrapper<String> waitForSuccess = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_WAIT_COMPLETE_CMD, timeoutSeconds, kubeJobName, namespace)) + .summary("Wait for success ('complete')").allowingNonZeroExitCode().newTask()); + Entities.submit(entity, Tasks.create("Wait for success then notify", () -> { + try { + if (waitForSuccess.get().contains("condition met")) + LOG.debug("Container job " + kubeJobName + " detected as completed (succeeded) in kubernetes"); + } finally { + synchronized (finishCount) { + finishCount.incrementAndGet(); + finishCount.notifyAll(); + } + } + })); + + ProcessTaskWrapper<String> waitForFailed = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_WAIT_FAILED_CMD, timeoutSeconds, kubeJobName, namespace)) + .summary("Wait for failed").allowingNonZeroExitCode().newTask()); + Entities.submit(entity, Tasks.create("Wait for failed then notify", () -> { + try { + if (waitForFailed.get().contains("condition met")) + LOG.debug("Container job " + kubeJobName + " detected as failed in kubernetes (may be valid non-zero exit)"); + } finally { + synchronized (finishCount) { + finishCount.incrementAndGet(); + finishCount.notifyAll(); + } + } + })); + + while (finishCount.get() == 0) { + LOG.debug("Container job " + kubeJobName + " waiting on complete or failed"); + try { + synchronized (finishCount) { + finishCount.wait(Duration.TEN_SECONDS.toMilliseconds()); + } + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + } + + if (waitForSuccess.isDone() && waitForSuccess.getExitCode() == 0) return true; + if (waitForFailed.isDone() && waitForFailed.getExitCode() == 0) return false; + return null; + } + + private Boolean checkForContainerCompletedUsingPodState(String kubeJobName, Entity entity, long timeoutSeconds) { + final AtomicInteger finishCount = new AtomicInteger(0); + + ProcessTaskWrapper<String> waitForSuccess = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_WAIT_COMPLETE_CMD, timeoutSeconds, kubeJobName, namespace)) + .summary("Wait for success ('complete')").allowingNonZeroExitCode().newTask()); + Entities.submit(entity, Tasks.create("Wait for success then notify", () -> { + try { + if (waitForSuccess.get().contains("condition met")) + LOG.debug("Container job " + kubeJobName + " detected as completed (succeeded) in kubernetes"); + } finally { + synchronized (finishCount) { + finishCount.incrementAndGet(); + finishCount.notifyAll(); + } + } + })); + + ProcessTaskWrapper<String> waitForFailed = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_WAIT_FAILED_CMD, timeoutSeconds, kubeJobName, namespace)) + .summary("Wait for failed").allowingNonZeroExitCode().newTask()); + Entities.submit(entity, Tasks.create("Wait for failed then notify", () -> { + try { + if (waitForFailed.get().contains("condition met")) + LOG.debug("Container job " + kubeJobName + " detected as failed in kubernetes (may be valid non-zero exit)"); + } finally { + synchronized (finishCount) { + finishCount.incrementAndGet(); + finishCount.notifyAll(); + } + } + })); + + while (finishCount.get() == 0) { + LOG.debug("Container job " + kubeJobName + " waiting on complete or failed"); + try { + synchronized (finishCount) { + finishCount.wait(Duration.TEN_SECONDS.toMilliseconds()); + } + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + } + + if (waitForSuccess.isDone() && waitForSuccess.getExitCode() == 0) return true; + if (waitForFailed.isDone() && waitForFailed.getExitCode() == 0) return false; + return null; + } + + private PodPhases waitForContainerAvailable(String kubeJobName, ContainerTaskResult result, CountdownTimer timer) { + return DynamicTasks.queue(Tasks.<PodPhases>builder().dynamic(true).displayName("Wait for container to be running (or fail)").body(() -> { + long first = System.currentTimeMillis(); + long last = first; + long backoffMillis = 10; + PodPhases phase = PodPhases.Unknown; + long startupReportDelay = 1000; // report any start longer than 1s + while (timer.isNotExpired()) { + phase = checkPodPhase(kubeJobName); + if (phase == PodPhases.Failed || phase == PodPhases.Succeeded || phase == PodPhases.Running) { + if (startupReportDelay>5000) LOG.info("Container detected in state "+phase+" after "+Duration.millis(System.currentTimeMillis()-first)); + else LOG.debug("Container detected in state "+phase+" after "+Duration.millis(System.currentTimeMillis()-first)); + return phase; + } + + if (phase!=PodPhases.Unknown && Strings.isBlank(result.kubePodName)) { + result.kubePodName = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_NAME_CMD, namespace, kubeJobName)).summary("Get pod name").allowingNonZeroExitCode().newTask()).get().trim(); + } + if (phase == PodPhases.Pending && Strings.isNonBlank(result.kubePodName)) { + // if pending, need to look for errors + String failedEvents = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_FAILED_JSON_CMD, namespace, result.kubePodName)).summary("Check pod failed events").allowingNonZeroExitCode().newTask()).get().trim(); + if (!"[]".equals(failedEvents)) { + String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events on failure").allowingNonZeroExitCode().newTask()).get().trim(); + throw new IllegalStateException("Job pod failed: "+failedEvents+"\n"+events); + } + } + + if (System.currentTimeMillis() - last > startupReportDelay) { + last = System.currentTimeMillis(); + + // log debug after 1s, then info after 5s, 20s, etc + // seems bad that it often takes 1s+ just to start the container :/ + Consumer<String> log = startupReportDelay<3*1000 ? LOG::debug : LOG::info; + + log.accept("Container taking a while to start ("+Duration.millis(last-first)+"): "+namespace+" "+ kubeJobName +" "+ result.kubePodName+" / phase '"+phase+"'"); + String stateJsonS = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask()).get().trim(); + if (Strings.isNonBlank(stateJsonS)) { + log.accept("Pod state: "+stateJsonS); + } + if (Strings.isNonBlank(result.kubePodName)) { + String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask()).get().trim(); + log.accept("Pod events: \n"+events); + } else { + String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, kubeJobName)).summary("Get job events").allowingNonZeroExitCode().newTask()).get().trim(); + log.accept("Job events: \n"+events); + } + + // first 1s, then 5s, then every 20s + startupReportDelay *= 5; + if (startupReportDelay > 20*1000) startupReportDelay = 20*1000; + } + long backoffMillis2 = backoffMillis; + Tasks.withBlockingDetails("waiting "+backoffMillis2+"ms for pod to be available (current status '" + phase + "')", () -> { + Time.sleep(backoffMillis2); + return null; + }); + if (backoffMillis<80) backoffMillis*=2; + } + throw new IllegalStateException("Timeout waiting for pod to be available; current status is '" + phase + "'"); + }).build()).getUnchecked(); + } + + private PodPhases checkPodPhase(String kubeJobName) { + PodPhases succeeded = getPodPhaseFromContainerState(kubeJobName); + if (succeeded != null) return succeeded; + + // this is the more official way, fall back to it if above is not recognised (eg waiting) + String phase = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_PHASE_CMD, namespace, kubeJobName)).summary("Get pod phase").allowingNonZeroExitCode().newTask()).get().trim(); + for (PodPhases candidate: PodPhases.values()) { + if (candidate.name().equalsIgnoreCase(phase)) return candidate; + } + return PodPhases.Unknown; + } + + private PodPhases getPodPhaseFromContainerState(String kubeJobName) { + // pod container state is populated much sooner than the pod status and job fields and wait, so prefer it + String stateJsonS = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask()).get().trim(); + if (Strings.isNonBlank(stateJsonS)) { + Object stateO = new Gson().fromJson(stateJsonS, Object.class); + if (stateO instanceof Map) { + if (!((Map<?, ?>) stateO).keySet().isEmpty()) { + Object stateK = (((Map<?, ?>) stateO).keySet().iterator().next()); + if (stateK instanceof String) { + String stateS = (String) stateK; + if ("terminated".equalsIgnoreCase(stateS)) return PodPhases.Succeeded; + if ("running".equalsIgnoreCase(stateS)) return PodPhases.Running; + } + } + } + } + return null; + } + + public ProcessTaskFactory<String> newDeleteJobTask(String kubeJobName) { + return newSimpleTaskFactory(String.format(JOBS_DELETE_CMD, kubeJobName, namespace)).summary("Delete job"); + } + private String initNamespaceAndGetNewJobName() { Entity entity = BrooklynTaskTags.getContextEntity(Tasks.current()); if (entity == null) { @@ -374,6 +540,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp .toLowerCase(); if (namespace==null) { namespace = kubeJobName; + namespaceRandom = true; } return kubeJobName; } @@ -382,21 +549,28 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp return namespace; } - public boolean doDeleteNamespace() { - if (namespace==null) return false; + public ProcessTaskWrapper<String> doDeleteNamespace(boolean wait, boolean requireSuccess) { + if (namespace==null) return null; Entity entity = BrooklynTaskTags.getContextEntity(Tasks.current()); - if (entity==null) return false; + if (entity==null) return null; // clean up - delete namespace Boolean devMode = EntityInitializers.resolve(config, KEEP_CONTAINER_FOR_DEBUGGING); if (Boolean.TRUE.equals(devMode)) { - return false; + return null; } - LOG.debug("Deleting namespace " + namespace); + LOG.info("Deleting namespace " + namespace); // do this not as a subtask so we can run even if the main queue fails - Entities.submit(entity, newSimpleTaskFactory(String.format(NAMESPACE_DELETE_CMD, namespace)).summary("Tear down containers").newTask()).block(); - System.runFinalization(); - return true; + ProcessTaskFactory<String> tf = newSimpleTaskFactory(String.format(NAMESPACE_DELETE_CMD, namespace)).summary("Tear down containers").allowingNonZeroExitCode(); + if (!requireSuccess) tf = tf.allowingNonZeroExitCode(); + else tf = tf.requiringExitCodeZero(); + ProcessTaskWrapper<String> task = Entities.submit(entity, tf.newTask()); + if (wait) { + task.get(); + LOG.info("Deleted namespace " + namespace); + System.runFinalization(); + } + return task; } public T summary(String summary) { this.summary = summary; return self(); } @@ -458,7 +632,9 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp return self(); } - public T deleteNamespace(Boolean delete) { this.deleteNamespace = delete; return self(); } + public T setDeleteNamespaceAfter(Boolean delete) { this.deleteNamespace = delete; return self(); } + @Deprecated /** @deprecated since 1.1 when introduced */ + public T deleteNamespace(Boolean delete) { return setDeleteNamespaceAfter(delete); } /** visible in the container environment */ public T jobIdentifier(String jobIdentifier) { diff --git a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java index a688ae21b3..0b85dcfb1e 100644 --- a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java +++ b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java @@ -30,6 +30,8 @@ import org.apache.brooklyn.test.Asserts; import org.apache.brooklyn.util.core.task.DynamicTasks; import org.apache.brooklyn.util.text.Identifiers; import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.annotations.Test; import java.util.HashMap; @@ -43,10 +45,14 @@ import static org.testng.AssertJUnit.assertTrue; @Test(groups = {"Live"}) public class ContainerTaskTest extends BrooklynAppUnitTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(ContainerTaskTest.class); + @Test public void testSuccessfulContainerTask() { + LOG.info("Starting container test"); TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + LOG.info("Starting dedicated container run"); Task<ContainerTaskResult> containerTask = ContainerTaskFactory.newInstance() .summary("Running container task") .jobIdentifier("test-container-task") @@ -58,6 +64,7 @@ public class ContainerTaskTest extends BrooklynAppUnitTestSupport { DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity); ContainerTaskResult result = containerTask.getUnchecked(Duration.ONE_MINUTE); + LOG.info("Result: "+result + " / "+result.getMainStdout().trim()); Asserts.assertEquals(result.getMainStdout().trim(), "hello test"); } @@ -220,7 +227,7 @@ public class ContainerTaskTest extends BrooklynAppUnitTestSupport { Asserts.assertEquals(result.getMainStdout().trim(), "hello " + uid); } finally { - DynamicTasks.queueIfPossible( baseFactory.summary("cleaning up").deleteNamespace(true).bashScriptCommands("rm hello-"+uid+".sh") ).orSubmitAsync(entity); + DynamicTasks.queueIfPossible( baseFactory.summary("cleaning up").setDeleteNamespaceAfter(true).bashScriptCommands("rm hello-"+uid+".sh") ).orSubmitAsync(entity); } }
