This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 19fac6b1eb8ebcab4d319235158888fbb16f6f24 Author: Alex Heneveld <[email protected]> AuthorDate: Tue Aug 16 11:51:53 2022 +0100 run container tasks in background and as transient avoid polluting tasks view with boring details of how container tasks need to be run --- .../tasks/kubectl/ContainerTaskFactory.java | 96 +++++++++++++--------- 1 file changed, 58 insertions(+), 38 deletions(-) diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java index f1b8caf255..84cb436b4b 100644 --- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java +++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import com.google.gson.Gson; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.mgmt.TaskAdaptable; import org.apache.brooklyn.core.entity.BrooklynConfigKeys; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.entity.EntityInitializers; @@ -79,6 +80,17 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp private Boolean deleteNamespace; Function<ContainerTaskResult,RET> returnConversion; + private <T extends TaskAdaptable<?>> T runTask(Entity entity, T t, boolean block, boolean markTransient) { + // previously we queued all the callers of this as sub-tasks, but that bloats the kilt diagram, so use entity.submit instead, optionally with blocking. + // most will be transient, apart from the main flow, so that they get GC'd quicker and don't clutter the kilt + //DynamicTasks.queue(t); + + if (markTransient) BrooklynTaskTags.setTransient(t.asTask()); + Entities.submit(entity, t); + if (block) { t.asTask().blockUntilEnded(Duration.PRACTICALLY_FOREVER); } + return t; + } + @Override public Task<RET> newTask() { final ByteArrayOutputStream stdout = new ByteArrayOutputStream(); @@ -152,7 +164,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp if (createNamespace==null) { createNsJobF.allowingNonZeroExitCode(); } - createNsJob = DynamicTasks.queue(createNsJobF.newTask()); + createNsJob = runTask(entity, createNsJobF.newTask(), true, true); } // only delete if told to always, unless we successfully create it @@ -177,13 +189,13 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp } } - DynamicTasks.queue( - newSimpleTaskFactory(String.format(JOBS_CREATE_CMD, jobYaml.getFile().getAbsolutePath(), namespace)).summary("Submit job").newTask()); + runTask(entity, + newSimpleTaskFactory(String.format(JOBS_CREATE_CMD, jobYaml.getFile().getAbsolutePath(), namespace)).summary("Submit job").newTask(), true, true); final CountdownTimer timer = CountdownTimer.newInstanceStarted(timeout); // wait for it to be running (or failed / succeeded) - - PodPhases phaseOnceActive = waitForContainerAvailable(kubeJobName, result, timer); + PodPhases phaseOnceActive = waitForContainerAvailable(entity, kubeJobName, result, timer); // waitForContainerPodContainerState(kubeJobName, result, timer); // notify once pod is available @@ -198,14 +210,14 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp LOG.debug("Container job "+kubeJobName+" completed, success "+succeeded); - ProcessTaskWrapper<String> retrieveOutput = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output").newTask()); - ProcessTaskWrapper<String> retrieveExitCode = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_EXIT_CODE_CMD, namespace, kubeJobName)).summary("Retrieve exit code").newTask()); + ProcessTaskWrapper<String> retrieveOutput = runTask(entity, newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output").newTask(), false, true); + ProcessTaskWrapper<String> retrieveExitCode = runTask(entity, newSimpleTaskFactory(String.format(PODS_EXIT_CODE_CMD, namespace, kubeJobName)).summary("Retrieve exit code").newTask(), false, true); - DynamicTasks.waitForLast(); result.mainStdout = retrieveOutput.get(); updateStdoutWithNewData(stdout, result.mainStdout); + retrieveExitCode.get(); String exitCodeS = retrieveExitCode.getStdout(); if (Strings.isNonBlank(exitCodeS)) result.mainExitCode = Integer.parseInt(exitCodeS.trim()); else result.mainExitCode = -1; @@ -223,10 +235,12 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp } else { Boolean devMode = EntityInitializers.resolve(config, KEEP_CONTAINER_FOR_DEBUGGING); if (!Boolean.TRUE.equals(devMode)) { - Entities.submit(entity, newDeleteJobTask(kubeJobName) - // namespace might have been deleted in parallel so okay if we don't delete the job - .allowingNonZeroExitCode() - .newTask()).get(); + Task<String> deletion = Entities.submit(entity, BrooklynTaskTags.setTransient(newDeleteJobTask(kubeJobName) + // namespace might have been deleted in parallel so okay if we don't delete the job; + .allowingNonZeroExitCode() + .newTask().asTask())); + // no big deal if not deleted, job ID will always be unique, so allow to delete in background and not block subsequent tasks + //deletion.get(); } } DynamicTasks.waitForLast(); @@ -242,7 +256,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp } private Boolean waitForContainerCompletedUsingK8sWaitFor(ByteArrayOutputStream stdout, String kubeJobName, Entity entity, CountdownTimer timer) { - return DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> { + return runTask(entity, Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> { while (true) { LOG.debug("Container job " + kubeJobName + " submitted, now waiting on success or failure"); @@ -255,22 +269,22 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp // other one-off checks for job error, we could do here // e.g. if image can't be pulled, for instance - refreshStdout(stdout, kubeJobName, timer); + refreshStdout(entity, stdout, kubeJobName, timer); // probably timed out or job not yet available; short wait then retry Time.sleep(Duration.millis(50)); } - }).build()).getUnchecked(); + }).build(), false, true).getUnchecked(); } private Boolean waitForContainerCompletedUsingPodState(ByteArrayOutputStream stdout, String kubeJobName, Entity entity, CountdownTimer timer) { - return DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> { + return runTask(entity, Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> { long retryDelay = 10; while (true) { LOG.debug("Container job " + kubeJobName + " submitted, now waiting on success or failure"); - PodPhases phase = checkPodPhase(kubeJobName); + PodPhases phase = checkPodPhase(entity, kubeJobName); if (phase.equals(PodPhases.Succeeded)) return true; if (phase.equals(PodPhases.Failed)) return false; @@ -279,7 +293,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp // other one-off checks for job error, we could do here // e.g. if image can't be pulled, for instance - refreshStdout(stdout, kubeJobName, timer); + refreshStdout(entity, stdout, kubeJobName, timer); // probably timed out or job not yet available; short wait then retry Time.sleep(Duration.millis(retryDelay)); @@ -290,14 +304,13 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp } } - }).build()).getUnchecked(); + }).build(), false, true).getUnchecked(); } - private void refreshStdout(ByteArrayOutputStream stdout, String kubeJobName, CountdownTimer timer) throws IOException { + private void refreshStdout(Entity entity, ByteArrayOutputStream stdout, String kubeJobName, CountdownTimer timer) throws IOException { // finally get the partial log for reporting - ProcessTaskWrapper<String> outputSoFarCmd = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output so far").allowingNonZeroExitCode().newTask()); - BrooklynTaskTags.setTransient(outputSoFarCmd.asTask()); - outputSoFarCmd.block(); + ProcessTaskWrapper<String> outputSoFarCmd = runTask(entity, + newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output so far").allowingNonZeroExitCode().newTask(), true, true); if (outputSoFarCmd.getExitCode() != 0) { throw new IllegalStateException("Error detected with container job while reading logs (exit code " + outputSoFarCmd.getExitCode() + "): " + outputSoFarCmd.getStdout() + " / " + outputSoFarCmd.getStderr()); } @@ -415,15 +428,15 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp return null; } - private PodPhases waitForContainerAvailable(String kubeJobName, ContainerTaskResult result, CountdownTimer timer) { - return DynamicTasks.queue(Tasks.<PodPhases>builder().dynamic(true).displayName("Wait for container to be running (or fail)").body(() -> { + private PodPhases waitForContainerAvailable(Entity entity, String kubeJobName, ContainerTaskResult result, CountdownTimer timer) { + return runTask(entity, Tasks.<PodPhases>builder().dynamic(true).displayName("Wait for container to be running (or fail)").body(() -> { long first = System.currentTimeMillis(); long last = first; long backoffMillis = 10; PodPhases phase = PodPhases.Unknown; long startupReportDelay = 1000; // report any start longer than 1s while (timer.isNotExpired()) { - phase = checkPodPhase(kubeJobName); + phase = checkPodPhase(entity, kubeJobName); if (phase == PodPhases.Failed || phase == PodPhases.Succeeded || phase == PodPhases.Running) { if (startupReportDelay>5000) LOG.info("Container detected in state "+phase+" after "+Duration.millis(System.currentTimeMillis()-first)); else LOG.debug("Container detected in state "+phase+" after "+Duration.millis(System.currentTimeMillis()-first)); @@ -431,13 +444,15 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp } if (phase!=PodPhases.Unknown && Strings.isBlank(result.kubePodName)) { - result.kubePodName = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_NAME_CMD, namespace, kubeJobName)).summary("Get pod name").allowingNonZeroExitCode().newTask()).get().trim(); + result.kubePodName = runTask(entity, newSimpleTaskFactory(String.format(PODS_NAME_CMD, namespace, kubeJobName)).summary("Get pod name").allowingNonZeroExitCode().newTask(), false, true).get().trim(); } if (phase == PodPhases.Pending && Strings.isNonBlank(result.kubePodName)) { // if pending, need to look for errors - String failedEvents = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_FAILED_JSON_CMD, namespace, result.kubePodName)).summary("Check pod failed events").allowingNonZeroExitCode().newTask()).get().trim(); + String failedEvents = runTask(entity, newSimpleTaskFactory(String.format(SCOPED_EVENTS_FAILED_JSON_CMD, namespace, result.kubePodName)).summary("Check pod failed events").allowingNonZeroExitCode().newTask(), + false, true).get().trim(); if (!"[]".equals(failedEvents)) { - String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events on failure").allowingNonZeroExitCode().newTask()).get().trim(); + String events = runTask(entity, newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events on failure").allowingNonZeroExitCode().newTask(), + false, false).get().trim(); throw new IllegalStateException("Job pod failed: "+failedEvents+"\n"+events); } } @@ -450,15 +465,18 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp Consumer<String> log = startupReportDelay<3*1000 ? LOG::debug : LOG::info; log.accept("Container taking a while to start ("+Duration.millis(last-first)+"): "+namespace+" "+ kubeJobName +" "+ result.kubePodName+" / phase '"+phase+"'"); - String stateJsonS = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask()).get().trim(); + String stateJsonS = runTask(entity, newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask(), + false, true).get().trim(); if (Strings.isNonBlank(stateJsonS)) { log.accept("Pod state: "+stateJsonS); } if (Strings.isNonBlank(result.kubePodName)) { - String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask()).get().trim(); + String events = runTask(entity, newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask(), + false, true).get().trim(); log.accept("Pod events: \n"+events); } else { - String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, kubeJobName)).summary("Get job events").allowingNonZeroExitCode().newTask()).get().trim(); + String events = runTask(entity, newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, kubeJobName)).summary("Get job events").allowingNonZeroExitCode().newTask(), + false, true).get().trim(); log.accept("Job events: \n"+events); } @@ -474,24 +492,25 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp if (backoffMillis<80) backoffMillis*=2; } throw new IllegalStateException("Timeout waiting for pod to be available; current status is '" + phase + "'"); - }).build()).getUnchecked(); + }).build(), false, true).getUnchecked(); } - private PodPhases checkPodPhase(String kubeJobName) { - PodPhases succeeded = getPodPhaseFromContainerState(kubeJobName); + private PodPhases checkPodPhase(Entity entity, String kubeJobName) { + PodPhases succeeded = getPodPhaseFromContainerState(entity, kubeJobName); if (succeeded != null) return succeeded; // this is the more official way, fall back to it if above is not recognised (eg waiting) - String phase = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_PHASE_CMD, namespace, kubeJobName)).summary("Get pod phase").allowingNonZeroExitCode().newTask()).get().trim(); + String phase = runTask(entity, newSimpleTaskFactory(String.format(PODS_STATUS_PHASE_CMD, namespace, kubeJobName)).summary("Get pod phase").allowingNonZeroExitCode().newTask(), false, true).get().trim(); for (PodPhases candidate: PodPhases.values()) { if (candidate.name().equalsIgnoreCase(phase)) return candidate; } return PodPhases.Unknown; } - private PodPhases getPodPhaseFromContainerState(String kubeJobName) { + private PodPhases getPodPhaseFromContainerState(Entity entity, String kubeJobName) { // pod container state is populated much sooner than the pod status and job fields and wait, so prefer it - String stateJsonS = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask()).get().trim(); + String stateJsonS = runTask(entity, newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask(), + false, true).get().trim(); if (Strings.isNonBlank(stateJsonS)) { Object stateO = new Gson().fromJson(stateJsonS, Object.class); if (stateO instanceof Map) { @@ -564,7 +583,8 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp ProcessTaskFactory<String> tf = newSimpleTaskFactory(String.format(NAMESPACE_DELETE_CMD, namespace)).summary("Tear down containers").allowingNonZeroExitCode(); if (!requireSuccess) tf = tf.allowingNonZeroExitCode(); else tf = tf.requiringExitCodeZero(); - ProcessTaskWrapper<String> task = Entities.submit(entity, tf.newTask()); + ProcessTaskWrapper<String> task = tf.newTask(); + Entities.submit(entity, BrooklynTaskTags.setTransient(task.asTask())); if (wait) { task.get(); LOG.info("Deleted namespace " + namespace);
