This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
The following commit(s) were added to refs/heads/master by this push:
new 9ea3d2d981 rejig container task
9ea3d2d981 is described below
commit 9ea3d2d981dcc03e173fd4f61fbc20d5a65e5b1b
Author: Alex Heneveld <[email protected]>
AuthorDate: Tue Jul 19 17:59:51 2022 +0100
rejig container task
* to track output, gather exit code, and clearer semantics
* also fix dirty detection across all processes
---
.../core/effector/ssh/SshEffectorTasks.java | 1 +
.../system/internal/SystemProcessTaskFactory.java | 1 +
.../brooklyn/tasks/kubectl/ContainerCommons.java | 25 +-
.../brooklyn/tasks/kubectl/ContainerEffector.java | 10 +-
.../brooklyn/tasks/kubectl/ContainerSensor.java | 14 +-
.../tasks/kubectl/ContainerTaskFactory.java | 373 ++++++++++++++++-----
.../{JobBuilder.java => KubeJobFileCreator.java} | 49 +--
.../tasks/kubectl/ContainerEffectorTest.java | 5 +-
.../tasks/kubectl/ContainerSensorTest.java | 6 +-
.../brooklyn/tasks/kubectl/ContainerTaskTest.java | 224 ++++++++-----
...uilderTest.java => KubeJobSpecCreatorTest.java} | 58 ++--
11 files changed, 527 insertions(+), 239 deletions(-)
diff --git
a/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshEffectorTasks.java
b/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshEffectorTasks.java
index c29563681d..d101b5c4a0 100644
---
a/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshEffectorTasks.java
+++
b/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshEffectorTasks.java
@@ -122,6 +122,7 @@ public class SshEffectorTasks {
}
@Override
public synchronized ProcessTaskWrapper<RET> newTask() {
+ dirty = false;
Entity entity =
BrooklynTaskTags.getTargetOrContextEntity(Tasks.current());
if (machine==null) {
if (log.isDebugEnabled())
diff --git
a/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
b/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
index afe5d3346b..56be6cfc6f 100644
---
a/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
+++
b/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
@@ -68,6 +68,7 @@ public class SystemProcessTaskFactory<T extends
SystemProcessTaskFactory<T,RET>,
@Override
public ProcessTaskWrapper<RET> newTask() {
+ dirty = false;
return new SystemProcessTaskWrapper();
}
diff --git
a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
index 4601a9ce02..5dbdf41af5 100644
---
a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
+++
b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
@@ -21,7 +21,6 @@ package org.apache.brooklyn.tasks.kubectl;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.BasicConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.config.SetConfigKey;
import org.apache.brooklyn.util.time.Duration;
@@ -36,14 +35,16 @@ public interface ContainerCommons {
ConfigKey<PullPolicy> CONTAINER_IMAGE_PULL_POLICY =
ConfigKeys.newConfigKey(new TypeToken<PullPolicy>() {} ,
"imagePullPolicy", "Container image pull policy. Allowed values:
{IfNotPresent, Always, Never}. ", PullPolicy.ALWAYS);
- ConfigKey<String> CONTAINER_NAME =
ConfigKeys.newStringConfigKey("containerName", "Container name");
ConfigKey<Boolean> KEEP_CONTAINER_FOR_DEBUGGING =
ConfigKeys.newBooleanConfigKey("keepContainerForDebugging", "When set to true,
the namespace" +
" and associated resources and services are not destroyed after
execution. Defaults value is 'false'.", Boolean.FALSE);
- ConfigKey<List> COMMANDS = ConfigKeys.newConfigKey(List.class,"commands",
"Commands to execute for container", Lists.newArrayList());
- ConfigKey<List> ARGUMENTS = ConfigKeys.newConfigKey(List.class,"args",
"Arguments to execute for container", Lists.newArrayList());
+ ConfigKey<Object> BASH_SCRIPT =
ConfigKeys.newConfigKey(Object.class,"bashScript", "A bash script (as string or
list of strings) to run, implies command '/bin/bash' '-c' and replaces
arguments");
+ ConfigKey<List> COMMAND = ConfigKeys.newConfigKey(List.class,"command",
"Single command and optional arguments to execute for the container (overrides
image EntryPoint and Cmd)", Lists.newArrayList());
+ ConfigKey<List> ARGUMENTS = ConfigKeys.newConfigKey(List.class,"args",
"Additional arguments to pass to the command at the container (in addition to
the command supplied here or the default in the image)", Lists.newArrayList());
- ConfigKey<Duration> TIMEOUT = ConfigKeys.newConfigKey(Duration.class,
"timeout", "Container wait timeout", Duration.minutes(1));
+ ConfigKey<Duration> TIMEOUT = ConfigKeys.newConfigKey(Duration.class,
"timeout", "Container execution timeout (default 5 minutes)",
Duration.minutes(5));
+
+ ConfigKey<Boolean> REQUIRE_EXIT_CODE_ZERO =
ConfigKeys.newConfigKey(Boolean.class, "requireExitCodeZero", "Whether task
should fail if container returns non-zero exit code (default true)", true);
ConfigKey<String> WORKING_DIR =
ConfigKeys.newStringConfigKey("workingDir", "Location where the container
commands are executed");
ConfigKey<Set<Map<String,String>>> VOLUME_MOUNTS = new
SetConfigKey.Builder<>(new TypeToken<Map<String,String>>() {}, "volumeMounts")
@@ -52,11 +53,13 @@ public interface ContainerCommons {
ConfigKey<Set<Map<String,Object>>> VOLUMES = new SetConfigKey.Builder(new
TypeToken<Map<String,Object>>() {}, "volumes")
.description("List of directories with data that is accessible
across multiple containers").defaultValue(null).build();
- String NAMESPACE_CREATE_CMD = "kubectl create namespace brooklyn-%s"; //
namespace name
- String NAMESPACE_SET_CMD = "kubectl config set-context --current
--namespace=brooklyn-%s"; // namespace name
- String JOBS_CREATE_CMD = "kubectl apply -f %s"; // deployment.yaml
absolute path
- String JOBS_FEED_CMD = "kubectl wait --timeout=%ds
--for=condition=complete job/%s"; // timeout, containerName
- String JOBS_LOGS_CMD = "kubectl logs jobs/%s"; // containerName
- String NAMESPACE_DELETE_CMD = "kubectl delete namespace brooklyn-%s"; //
namespace name
+ String NAMESPACE_CREATE_CMD = "kubectl create namespace %s";
+ String NAMESPACE_SET_CMD = "kubectl config set-context --current
--namespace=%s";
+ String JOBS_CREATE_CMD = "kubectl apply -f %s --namespace=%s";
+ String JOBS_FEED_CMD = "kubectl wait --timeout=%ds
--for=condition=complete job/%s --namespace=%s";
+ String JOBS_FEED_FAILED_CMD = "kubectl wait --timeout=%ds
--for=condition=failed job/%s --namespace=%s";
+ String JOBS_LOGS_CMD = "kubectl logs jobs/%s --namespace=%s";
+ String PODS_EXIT_CODE_CMD = "kubectl get pods --namespace=%s
-ojsonpath='{.items[0].status.containerStatuses[0].state.terminated.exitCode}'";
+ String NAMESPACE_DELETE_CMD = "kubectl delete namespace %s";
}
diff --git
a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerEffector.java
b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerEffector.java
index 5b34ae604e..0c41de1a24 100644
---
a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerEffector.java
+++
b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerEffector.java
@@ -26,12 +26,9 @@ import org.apache.brooklyn.core.effector.Effectors;
import org.apache.brooklyn.core.entity.EntityInitializers;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.task.DynamicTasks;
-import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.TimeUnit;
-
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.brooklyn.core.mgmt.BrooklynTaskTags.EFFECTOR_TAG;
@@ -69,14 +66,13 @@ public class ContainerEffector extends
AddEffectorInitializerAbstract implements
@Override
public String call(ConfigBag parameters) {
ConfigBag configBag =
ConfigBag.newInstanceCopying(this.params).putAll(parameters);
- Task<String> containerTask = new
ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new
ContainerTaskFactory.ConcreteContainerTaskFactory()
.summary("Executing Container Image: " +
EntityInitializers.resolve(configBag, CONTAINER_IMAGE))
- .tag(entity().getId() + "-" + EFFECTOR_TAG)
+ .jobIdentifier(entity().getId() + "-" + EFFECTOR_TAG)
.configure(configBag.getAllConfig())
.newTask();
DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity());
- Object result = containerTask.getUnchecked(Duration.of(5,
TimeUnit.MINUTES));
- return result.toString();
+ return containerTask.getUnchecked().getMainStdout();
}
}
}
diff --git
a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
index 0d54e40680..7bb8dccc4e 100644
---
a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
+++
b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
@@ -18,7 +18,6 @@
*/
package org.apache.brooklyn.tasks.kubectl;
-import com.google.common.collect.Iterables;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.AttributeSensor;
@@ -35,7 +34,6 @@ import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@@ -77,18 +75,14 @@ public class ContainerSensor<T> extends
AbstractAddSensorFeed<T> implements Cont
.callable(new Callable<Object>() {
@Override
public Object call() throws Exception {
- Task<String> containerTask = new
ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
+ Task<ContainerTaskFactory.ContainerTaskResult>
containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory()
.summary("Running " +
EntityInitializers.resolve(configBag, SENSOR_NAME))
- .tag(entity.getId() + "-" + SENSOR_TAG)
+ .jobIdentifier(entity.getId() + "-" +
SENSOR_TAG)
.configure(configBag.getAllConfig())
.newTask();
DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
- Object result =
containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES));
- List<String> res = (List<String>) result;
- while(!res.isEmpty() &&
Iterables.getLast(res).matches("namespace .* deleted\\s*")) res =
res.subList(0, res.size()-1);
-
- String res2 = res.isEmpty() ? null :
Iterables.getLast(res);
- return (new
SshCommandSensor.CoerceOutputFunction<>(sensor.getTypeToken(),
initParam(FORMAT), initParam(LAST_YAML_DOCUMENT))).apply(res2);
+ String mainStdout =
containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES)).getMainStdout();
+ return (new
SshCommandSensor.CoerceOutputFunction<>(sensor.getTypeToken(),
initParam(FORMAT), initParam(LAST_YAML_DOCUMENT))).apply(mainStdout);
}
})
.suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates))
diff --git
a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
index 7e9a5910df..a03f6f2b1a 100644
---
a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
+++
b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
@@ -19,103 +19,313 @@
package org.apache.brooklyn.tasks.kubectl;
import com.google.common.collect.Lists;
+import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.mgmt.TaskFactory;
import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
+import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInitializers;
+import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.mgmt.ha.BrooklynBomOsgiArchiveInstaller;
+import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.json.ShellEnvironmentSerializer;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
import org.apache.brooklyn.util.core.task.TaskBuilder;
import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory;
import org.apache.brooklyn.util.core.task.system.ProcessTaskStub;
import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
import
org.apache.brooklyn.util.core.task.system.internal.SystemProcessTaskFactory;
+import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.CountdownTimer;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import static org.apache.brooklyn.tasks.kubectl.ContainerCommons.*;
-public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET>
implements TaskFactory<Task<RET>> {
+public class ContainerTaskFactory<T extends ContainerTaskFactory<T>>
implements TaskFactory<Task<ContainerTaskFactory.ContainerTaskResult>> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ContainerTaskFactory.class);
protected String summary;
- protected String tag = "";
+ protected String jobIdentifier = "";
protected final ConfigBag config = ConfigBag.newInstance();
+ private String namespace;
+ private Boolean createNamespace;
+ private Boolean deleteNamespace;
@Override
- public Task<RET> newTask() {
- ConfigBag configBag = this.config;
-
- List<String> commandsCfg = EntityInitializers.resolve(configBag,
COMMANDS);
- List<String> argumentsCfg = EntityInitializers.resolve(configBag,
ARGUMENTS);
- String containerImage = EntityInitializers.resolve(configBag,
CONTAINER_IMAGE);
- PullPolicy containerImagePullPolicy =
EntityInitializers.resolve(configBag, CONTAINER_IMAGE_PULL_POLICY);
- String containerNameFromCfg = EntityInitializers.resolve(configBag,
CONTAINER_NAME);
- Boolean devMode = EntityInitializers.resolve(configBag,
KEEP_CONTAINER_FOR_DEBUGGING);
-
- String workingDir = EntityInitializers.resolve(configBag, WORKING_DIR);
- Set<Map<String,String>> volumeMounts = (Set<Map<String,String>>)
EntityInitializers.resolve(configBag, VOLUME_MOUNTS);
- Set<Map<String, Object>> volumes = (Set<Map<String, Object>>)
EntityInitializers.resolve(configBag, VOLUMES);
-
- if(Strings.isBlank(containerImage)) {
- throw new IllegalStateException("You must specify containerImage
when using " + this.getClass().getSimpleName());
- }
+ public Task<ContainerTaskResult> newTask() {
+ final ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+ TaskBuilder<ContainerTaskResult> taskBuilder =
Tasks.<ContainerTaskResult>builder().dynamic(true)
+ .displayName(this.summary)
+
.tag(BrooklynTaskTags.tagForStream(BrooklynTaskTags.STREAM_STDOUT, stdout))
+ .body(()-> {
+ List<String> commandsCfg =
EntityInitializers.resolve(config, COMMAND);
+ List<String> argumentsCfg =
EntityInitializers.resolve(config, ARGUMENTS);
- final String cleanImageName = containerImage.contains(":") ?
containerImage.substring(0, containerImage.indexOf(":")) : containerImage;
-
- final String containerName = (Strings.isBlank(containerNameFromCfg)
- ? ( (Strings.isNonBlank(this.tag) ? this.tag + "-" :
"").concat(cleanImageName).concat("-").concat(Strings.makeRandomId(10)))
- : containerNameFromCfg)
- .replaceAll("[^A-Za-z0-9-]", "") // remove all symbols
- .toLowerCase();
-
- final String jobYamlLocation = new JobBuilder()
- .withImage(containerImage)
- .withImagePullPolicy(containerImagePullPolicy)
- .withName(containerName)
- .withArgs(argumentsCfg)
- .withEnv(EntityInitializers.resolve(configBag,
BrooklynConfigKeys.SHELL_ENVIRONMENT))
- .withCommands(Lists.newArrayList(commandsCfg))
- .withVolumeMounts(volumeMounts)
- .withVolumes(volumes)
- .withWorkingDir(workingDir)
- .build();
-
-
- final long timeoutInSeconds = EntityInitializers.resolve(configBag,
TIMEOUT).toSeconds();
- Task<String> runCommandsTask = buildKubeTask(configBag, "Submit job",
String.format(JOBS_CREATE_CMD,jobYamlLocation)).asTask();
- Task<String> waitTask = buildKubeTask(configBag, "Wait For
Completion",
String.format(JOBS_FEED_CMD,timeoutInSeconds,containerName)).asTask();
- if(!devMode) {
- // making these two inessential to insure proper namespace cleanup
- BrooklynTaskTags.addTagDynamically(runCommandsTask,
BrooklynTaskTags.INESSENTIAL_TASK);
- BrooklynTaskTags.addTagDynamically(waitTask,
BrooklynTaskTags.INESSENTIAL_TASK);
- }
+ Object bashScript = EntityInitializers.resolve(config,
BASH_SCRIPT);
+ if (bashScript!=null) {
+ if (!commandsCfg.isEmpty()) LOG.warn("Ignoring
'command' "+commandsCfg+" because bashScript is set");
+ if (!argumentsCfg.isEmpty()) LOG.warn("Ignoring 'args'
"+argumentsCfg+" because bashScript is set");
+
+ commandsCfg = MutableList.of("/bin/bash", "-c");
+ List<Object> argumentsCfgO = bashScript instanceof
Iterable ? MutableList.copyOf((Iterable) commandsCfg) :
MutableList.of(bashScript);
+ argumentsCfg =
MutableList.of(argumentsCfgO.stream().map(x ->
""+x).collect(Collectors.joining("\n")));
+ }
+
+ String containerImage = EntityInitializers.resolve(config,
CONTAINER_IMAGE);
+ PullPolicy containerImagePullPolicy =
EntityInitializers.resolve(config, CONTAINER_IMAGE_PULL_POLICY);
+ Boolean devMode = EntityInitializers.resolve(config,
KEEP_CONTAINER_FOR_DEBUGGING);
+
+ String workingDir = EntityInitializers.resolve(config,
WORKING_DIR);
+ Set<Map<String,String>> volumeMounts =
(Set<Map<String,String>>) EntityInitializers.resolve(config, VOLUME_MOUNTS);
+ Set<Map<String, Object>> volumes = (Set<Map<String,
Object>>) EntityInitializers.resolve(config, VOLUMES);
+
+ if(Strings.isBlank(containerImage)) {
+ throw new IllegalStateException("You must specify
containerImage when using " + this.getClass().getSimpleName());
+ }
+
+ Entity entity =
BrooklynTaskTags.getContextEntity(Tasks.current());
+ if (entity == null) {
+ throw new IllegalStateException("Task must run in
context of entity to background jobs");
+ }
+
+ final String cleanImageName = containerImage.contains(":")
? containerImage.substring(0, containerImage.indexOf(":")) : containerImage;
+
+ final String containerName =
(Strings.isNonBlank(this.jobIdentifier) ? this.jobIdentifier + "-" : "")
+
.concat(cleanImageName).concat("-").concat(Strings.makeRandomId(10))
+ .replaceAll("[^A-Za-z0-9-]", "") // remove all
symbols
+ .toLowerCase();
+ if (namespace==null) {
+ namespace = "brooklyn-" + containerName;
+ }
+
+ LOG.debug("Submitting container job in namespace
"+namespace);
+
+ Map<String, String> env = new
ShellEnvironmentSerializer(((EntityInternal)entity).getManagementContext()).serialize(EntityInitializers.resolve(config,
BrooklynConfigKeys.SHELL_ENVIRONMENT));
+ final
BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> jobYaml = new
KubeJobFileCreator()
+ .withImage(containerImage)
+ .withImagePullPolicy(containerImagePullPolicy)
+ .withName(containerName)
+ .withArgs(argumentsCfg)
+ .withEnv(env)
+ .withCommand(Lists.newArrayList(commandsCfg))
+ .withVolumeMounts(volumeMounts)
+ .withVolumes(volumes)
+ .withWorkingDir(workingDir)
+ .createFile();
+
Tasks.addTagDynamically(BrooklynTaskTags.tagForEnvStream(BrooklynTaskTags.STREAM_ENV,
env));
+
+ try {
+
+ Duration timeout = EntityInitializers.resolve(config,
TIMEOUT);
+
+ ContainerTaskResult result = new ContainerTaskResult();
+ result.interestingJobs = MutableList.of();
+
+ ProcessTaskWrapper<ProcessTaskWrapper<?>> createNsJob
= null;
+ if (!Boolean.FALSE.equals(createNamespace)) {
+ ProcessTaskFactory<ProcessTaskWrapper<?>>
createNsJobF = newSimpleTaskFactory(
+ String.format(NAMESPACE_CREATE_CMD,
namespace)
+ //, String.format(NAMESPACE_SET_CMD,
namespace)
+ ).summary("Set up namespace").returning(x -> x);
+ if (createNamespace==null) {
+ createNsJobF.allowingNonZeroExitCode();
+ }
+ createNsJob =
DynamicTasks.queue(createNsJobF.newTask());
+ }
+
+ // only delete if told to always, unless we
successfully create it
+ boolean deleteNamespaceHere =
Boolean.TRUE.equals(deleteNamespace);
+ try {
+ if (createNsJob!=null) {
+ ProcessTaskWrapper<?> nsDetails =
createNsJob.get();
+ if (nsDetails.getExitCode()==0) {
+ LOG.debug("Namespace created");
+ if (deleteNamespace==null)
deleteNamespaceHere = true;
+ } else if (nsDetails.getExitCode()==1 &&
nsDetails.getStderr().contains("AlreadyExists")) {
+ if (Boolean.TRUE.equals(createNamespace)) {
+ LOG.warn("Namespace "+namespace+"
already exists; failing");
+ throw new
IllegalStateException("Namespace "+namespace+" exists when creating a job that
expects to create this namespace");
+ } else {
+ LOG.debug("Namespace exists already;
reusing it");
+ }
+ } else {
+ LOG.warn("Unexpected namespace creation
problem: "+nsDetails.getStderr()+ "(code "+nsDetails.getExitCode()+")");
+ if (deleteNamespace==null)
deleteNamespaceHere = true;
+ throw new
IllegalStateException("Unexpected namespace creation problem ("+namespace+");
see log for more details");
+ }
+ }
+
+ result.interestingJobs.add(DynamicTasks.queue(
+
newSimpleTaskFactory(String.format(JOBS_CREATE_CMD,
jobYaml.getFile().getAbsolutePath(), namespace)).summary("Submit
job").newTask()));
+
+ final CountdownTimer timer =
CountdownTimer.newInstanceStarted(timeout);
+
+ // use `wait --for` api, but in a 5s loop in case
there are other issues
+ boolean succeeded =
DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for
success or failure").body(() -> {
+ while (true) {
+ LOG.debug("Container job submitted, now
waiting on success or failure");
+
+ long secondsLeft = Math.min(Math.max(1,
timer.getDurationRemaining().toSeconds()), 5);
+ final AtomicInteger finishCount = new
AtomicInteger(0);
+
+ ProcessTaskWrapper<String> waitForSuccess
= Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_FEED_CMD,
secondsLeft, containerName, namespace))
+ .summary("Wait for success
('complete')").allowingNonZeroExitCode().newTask());
+ Entities.submit(entity, Tasks.create("Wait
for success then notify", () -> {
+ try {
+ if
(waitForSuccess.get().contains("condition met")) LOG.debug("Container job
"+namespace+" detected as completed (succeeded) in kubernetes");
+ } finally {
+ synchronized (finishCount) {
+ finishCount.incrementAndGet();
+ finishCount.notifyAll();
+ }
+ }
+ }));
+
+ ProcessTaskWrapper<String> waitForFailed =
Entities.submit(entity,
newSimpleTaskFactory(String.format(JOBS_FEED_FAILED_CMD, secondsLeft,
containerName, namespace))
+ .summary("Wait for
failed").allowingNonZeroExitCode().newTask());
+ Entities.submit(entity, Tasks.create("Wait
for failed then notify", () -> {
+ try {
+ if
(waitForFailed.get().contains("condition met")) LOG.debug("Container job
"+namespace+" detected as failed in kubernetes (may be valid non-zero exit)");
+ } finally {
+ synchronized (finishCount) {
+ finishCount.incrementAndGet();
+ finishCount.notifyAll();
+ }
+ }
+ }));
+
+ while (finishCount.get() == 0) {
+ LOG.debug("Container job "+namespace+"
waiting on complete or failed");
+ try {
+ synchronized (finishCount) {
+
finishCount.wait(Duration.TEN_SECONDS.toMilliseconds());
+ }
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ if (waitForSuccess.isDone() &&
waitForSuccess.getExitCode() == 0) return true;
+ if (waitForFailed.isDone() &&
waitForFailed.getExitCode() == 0) return false;
+ LOG.debug("Container job "+namespace+" not
yet complete, will retry");
+ // probably timed out or job not yet
available; short wait then retry
+ Time.sleep(Duration.millis(50));
+ if (timer.isExpired())
+ throw new
IllegalStateException("Timeout waiting for success or failure");
+
+ // any other one-off checks for job error,
we could do here
+
+ // finally get the partial log for
reporting
+ ProcessTaskWrapper<String> outputSoFarCmd
= DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD,
containerName, namespace)).summary("Retrieve output so far").newTask());
+
BrooklynTaskTags.setTransient(outputSoFarCmd.asTask());
+ String outputSoFar = outputSoFarCmd.get();
+ String newOutput =
outputSoFar.substring(stdout.size());
+ LOG.debug("Container job "+namespace+"
output: "+newOutput);
+
stdout.write(newOutput.getBytes(StandardCharsets.UTF_8));
+ }
+
+ }).build()).getUnchecked();
+ LOG.debug("Container job "+namespace+" completed,
success "+succeeded);
+
+ ProcessTaskWrapper<String> retrieveOutput =
DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD,
containerName, namespace)).summary("Retrieve output").newTask());
+ result.interestingJobs.add(retrieveOutput);
+
+ ProcessTaskWrapper<String> retrieveExitCode =
DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_EXIT_CODE_CMD,
namespace)).summary("Retrieve exit code").newTask());
+ result.interestingJobs.add(retrieveExitCode);
+
+ DynamicTasks.waitForLast();
+ result.mainStdout = retrieveOutput.get();
+
stdout.write(result.mainStdout.substring(stdout.size()).getBytes(StandardCharsets.UTF_8));
+
+ String exitCodeS = retrieveExitCode.getStdout();
+ if (Strings.isNonBlank(exitCodeS))
result.mainExitCode = Integer.parseInt(exitCodeS.trim());
+ else result.mainExitCode = -1;
+
+ if (result.mainExitCode!=0 &&
config.get(REQUIRE_EXIT_CODE_ZERO)) {
+ LOG.info("Failed container job "+namespace+"
(exit code "+result.mainExitCode+") output: "+result.mainExitCode);
+ throw new IllegalStateException("Non-zero exit
code (" + result.mainExitCode + ") disallowed");
+ }
+
+ return result;
+
+ } finally {
+ // clean up - delete namespace
+ if (!devMode && deleteNamespaceHere) {
+ LOG.debug("Deleting namespace "+namespace);
+ // do this not as a subtask so we can run even
if the main queue fails
+ Entities.submit(entity,
newSimpleTaskFactory(String.format(NAMESPACE_DELETE_CMD,
namespace)).summary("Tear down containers").newTask()).block();
+ }
+ System.runFinalization();
+ }
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ } finally {
+ jobYaml.deleteIfTemp();
+ }
+ });
- TaskBuilder<RET> taskBuilder = Tasks.<RET>builder()
- .displayName(this.summary)
- .add(buildKubeTask(configBag, "Set Up and Run",
- String.format(NAMESPACE_CREATE_CMD,containerName),
- String.format(NAMESPACE_SET_CMD,containerName)))
- .add(runCommandsTask)
- .add(waitTask)
- .add(buildKubeTask(configBag, "Retrieve Output",
String.format(JOBS_LOGS_CMD,containerName)));
-
- if(!devMode) {
- taskBuilder.add(buildKubeTask(configBag, "Tear Down",
String.format(NAMESPACE_DELETE_CMD,containerName)));
- }
return taskBuilder.build();
}
- public T summary(String summary) {
- this.summary = summary;
+ public T summary(String summary) { this.summary = summary; return self(); }
+ public T timeout(Duration timeout) { config.put(TIMEOUT, timeout); return
self(); }
+ public T command(List<String> commands) { config.put(COMMAND, commands);
return self(); }
+ public T command(String baseCommandWithNoArgs, String
...extraCommandArguments) { config.put(COMMAND,
MutableList.of(baseCommandWithNoArgs).appendAll(Arrays.asList(extraCommandArguments)));
return self(); }
+ public T bashScriptCommands(List<String> commands) {
+ config.put(BASH_SCRIPT, commands);
+ return self();
+ }
+ public T bashScriptCommands(String firstCommandAndArgs, String
...otherCommandAndArgs) { return
bashScriptCommands(MutableList.of(firstCommandAndArgs).appendAll(Arrays.asList(otherCommandAndArgs)));
}
+ public T image(String image) { config.put(CONTAINER_IMAGE, image); return
self(); }
+ public T allowNonZeroExitCode() { return allowNonZeroExitCode(true); }
+ public T allowNonZeroExitCode(boolean allowNonZero) {
config.put(REQUIRE_EXIT_CODE_ZERO, !allowNonZero); return self(); }
+ public T imagePullPolicy(PullPolicy policy) {
config.put(CONTAINER_IMAGE_PULL_POLICY, policy); return self(); }
+ public T env(Map<String,?> map) {
+ config.put(BrooklynConfigKeys.SHELL_ENVIRONMENT, MutableMap.copyOf(
map ) );
+ return self();
+ }
+ public T env(String key, Object val) {
+ return env(MutableMap.copyOf(
config.get(BrooklynConfigKeys.SHELL_ENVIRONMENT) ).add(key, val));
+ }
+
+ /** specify the namespace to use, and whether to create or delete it. by
default a randomly generated namespace is used and always cleaned up,
+ * but by using this, a caller can ensure the namespace persists. if using
this, it is the caller's responsibility to avoid collisions.
+ *
+ * @param namespace namespace to use
+ * @param create whether to create; null (default) is to auto-detect,
create it if it doesn't exist
+ * @param delete wehther to delete; null (default) means to delete it if
and only if we created it (and not dev mode)
+ */
+ public T useNamespace(String namespace, Boolean create, Boolean delete) {
+ this.namespace = namespace;
+ this.createNamespace = create;
+ this.deleteNamespace = delete;
return self();
}
- public T tag(String tag) {
- this.tag = tag;
+ public T deleteNamespace(Boolean delete) { this.deleteNamespace = delete;
return self(); }
+
+ /** visible in the container environment */
+ public T jobIdentifier(String jobIdentifier) {
+ this.jobIdentifier = jobIdentifier;
return self();
}
@@ -127,23 +337,36 @@ public class ContainerTaskFactory<T extends
ContainerTaskFactory<T,RET>,RET> im
return self();
}
- public static ProcessTaskWrapper<String> buildKubeTask(final ConfigBag
configBag, final String taskSummary, final String... kubeCommands) {
- Map<String, Object> env = EntityInitializers.resolve(configBag,
BrooklynConfigKeys.SHELL_ENVIRONMENT);
- Map<String, String> envVars = MutableMap.of();
- if(env != null && env.size() > 0) {
- env.forEach((k,v) -> envVars.put(k, v.toString()));
- }
+ private ProcessTaskFactory<String> newSimpleTaskFactory(final String...
kubeCommands) {
return new
SystemProcessTaskFactory.ConcreteSystemProcessTaskFactory<String>(kubeCommands)
- .summary(taskSummary)
- .configure(configBag.getAllConfig())
- .environmentVariables(envVars) // needed to be shown in the UI
;)
+ //i think we don't care about any of these configs, and most
cause debug messages about them being ignored
+ //.configure(config.getAllConfig())
+ //.environmentVariables(envVars) // needed to be shown in the
UI ;)
+
.<String>returning(ProcessTaskStub.ScriptReturnType.STDOUT_STRING)
- .requiringExitCodeZero().newTask();
+ .requiringExitCodeZero();
}
- public static class ConcreteContainerTaskFactory<RET> extends
ContainerTaskFactory<ConcreteContainerTaskFactory<RET>, RET> {
+ public static class ConcreteContainerTaskFactory extends
ContainerTaskFactory<ConcreteContainerTaskFactory> {
public ConcreteContainerTaskFactory() {
super();
}
}
+
+ public static class ContainerTaskResult {
+ private List<ProcessTaskWrapper<?>> interestingJobs;
+ private String mainStdout;
+ private Integer mainExitCode;
+
+ /** This will be 0 unless allowNonZeroExitCode was specified */
+ public Integer getMainExitCode() {
+ return mainExitCode;
+ }
+ public String getMainStdout() {
+ return mainStdout;
+ }
+ public List<ProcessTaskWrapper<?>> getInterestingJobs() {
+ return interestingJobs;
+ }
+ }
}
diff --git
a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/JobBuilder.java
b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/KubeJobFileCreator.java
similarity index 88%
rename from
software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/JobBuilder.java
rename to
software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/KubeJobFileCreator.java
index 9249393f44..5b090d7442 100644
---
a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/JobBuilder.java
+++
b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/KubeJobFileCreator.java
@@ -20,6 +20,7 @@ package org.apache.brooklyn.tasks.kubectl;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.brooklyn.core.mgmt.ha.BrooklynBomOsgiArchiveInstaller;
import org.apache.brooklyn.util.text.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,8 +40,8 @@ import java.util.stream.Collectors;
/**
* This was needed to ensure our Kubernetes Yaml Job configurations are valid.
*/
-public class JobBuilder {
- private static final Logger LOG =
LoggerFactory.getLogger(JobBuilder.class);
+public class KubeJobFileCreator {
+ private static final Logger LOG =
LoggerFactory.getLogger(KubeJobFileCreator.class);
String jobName;
String imageName;
@@ -51,21 +52,21 @@ public class JobBuilder {
String prefix = "brooklyn-job";
- List<String> commands = Lists.newArrayList();
+ List<String> command = Lists.newArrayList();
List<String> args = Lists.newArrayList();
- Map<String, Object> env = Maps.newHashMap();
+ Map<String, String> env = Maps.newHashMap();
List<Map<String,String>> volumeMounts = Lists.newArrayList();
List<Map<String, Object>> volumes = Lists.newArrayList();
- public JobBuilder withName(final String name) {
+ public KubeJobFileCreator withName(final String name) {
this.jobName = name;
return this;
}
- public JobBuilder withImage(final String image){
+ public KubeJobFileCreator withImage(final String image){
this.imageName = image;
return this;
}
@@ -75,59 +76,59 @@ public class JobBuilder {
* @param eimagePullPolicy
* @return
*/
- public JobBuilder withImagePullPolicy(final PullPolicy eimagePullPolicy){
+ public KubeJobFileCreator withImagePullPolicy(final PullPolicy
eimagePullPolicy){
if (eimagePullPolicy != null) {
this.imagePullPolicy = eimagePullPolicy.val();
}
return this;
}
- public JobBuilder withCommands(final List<String> commandsArg){
- if (commandsArg != null) {
- this.commands.addAll(commandsArg);
+ public KubeJobFileCreator withCommand(final List<String>
commandAndEntryPointArgs){
+ if (commandAndEntryPointArgs != null) {
+ this.command.addAll(commandAndEntryPointArgs);
}
return this;
}
- public JobBuilder withArgs(final List<String> args){
+ public KubeJobFileCreator withArgs(final List<String> args){
if (args != null) {
this.args.addAll(args);
}
return this;
}
- public JobBuilder withVolumeMounts(final Set<Map<String,String>>
volumeMounts) {
+ public KubeJobFileCreator withVolumeMounts(final Set<Map<String,String>>
volumeMounts) {
if (volumeMounts != null) {
this.volumeMounts.addAll(volumeMounts);
}
return this;
}
- public JobBuilder withVolumes(final Set<Map<String, Object>> volumes) {
+ public KubeJobFileCreator withVolumes(final Set<Map<String, Object>>
volumes) {
if (volumes != null) {
this.volumes.addAll(volumes);
}
return this;
}
- public JobBuilder withWorkingDir(String workingDir) {
+ public KubeJobFileCreator withWorkingDir(String workingDir) {
this.workingDir = workingDir;
return this;
}
- public JobBuilder withPrefix(final String prefixArg){
+ public KubeJobFileCreator withPrefix(final String prefixArg){
this.prefix = prefixArg;
return this;
}
- public JobBuilder withEnv(final Map<String,Object> env){
+ public KubeJobFileCreator withEnv(final Map<String,String> env){
if (env != null) {
this.env.putAll(env);
}
return this;
}
- public String build(){
+ public BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> createFile(){
JobTemplate jobTemplate = new JobTemplate(jobName);
ContainerSpec containerSpec =
jobTemplate.getSpec().getTemplate().getContainerSpec(0);
@@ -142,13 +143,13 @@ public class JobBuilder {
List<Map<String,String>> envList = env.entrySet().stream().map (e
-> {
Map<String,String> envItem = new HashMap<>();
envItem.put("name", e.getKey());
- envItem.put("value", e.getValue().toString());
+ envItem.put("value", e.getValue());
return envItem;
}).collect(Collectors.toList());
containerSpec.setEnv(envList);
}
- if (!commands.isEmpty()) {
- containerSpec.setCommand(this.commands);
+ if (!command.isEmpty()) {
+ containerSpec.setCommand(this.command);
}
if (!args.isEmpty()) {
containerSpec.setArgs(this.args);
@@ -177,7 +178,7 @@ public class JobBuilder {
return serializeAndWriteToTempFile(jobTemplate);
}
- private String serializeAndWriteToTempFile(JobTemplate jobTemplate) {
+ private BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File>
serializeAndWriteToTempFile(JobTemplate jobTemplate) {
DumperOptions options = new DumperOptions();
options.setIndent(2);
options.setPrettyFlow(true);
@@ -203,8 +204,8 @@ public class JobBuilder {
PrintWriter sw = new PrintWriter(jobBodyPath);
Yaml yaml = new Yaml(representer, options);
yaml.dump(jobTemplate, sw);
- LOG.info("Job body dumped at: {}" , jobBodyPath.getAbsolutePath());
- return jobBodyPath.getAbsolutePath();
+ LOG.debug("Job body dumped at: {}" ,
jobBodyPath.getAbsolutePath());
+ return new
BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<>(jobBodyPath, true);
} catch (IOException e) {
throw new RuntimeException("Failed to create temp file for
container", e);
}
@@ -226,7 +227,7 @@ class TemplateSpec {
/**
* To do so, set .spec.backoffLimit to specify the number of retries
before considering a Job as failed. The back-off limit is set by default to 6.
*/
- Integer backoffLimit = 1;
+ Integer backoffLimit = 0;
JobSpec template;
diff --git
a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerEffectorTest.java
b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerEffectorTest.java
index cee55084e3..d882b2fd06 100644
---
a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerEffectorTest.java
+++
b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerEffectorTest.java
@@ -49,7 +49,8 @@ public class ContainerEffectorTest extends
BrooklynAppUnitTestSupport {
ContainerEffector.EFFECTOR_NAME, "test-container-effector",
ContainerCommons.CONTAINER_IMAGE, "perl",
ContainerCommons.CONTAINER_IMAGE_PULL_POLICY,
PullPolicy.IF_NOT_PRESENT,
- ContainerCommons.COMMANDS, ImmutableList.of("/bin/bash", "-c",
"echo " + message),
+// ContainerCommons.COMMAND, ImmutableList.of("/bin/bash",
"-c", "echo " + message),
+ ContainerCommons.BASH_SCRIPT, "echo "+message+" ${HELLO}",
BrooklynConfigKeys.SHELL_ENVIRONMENT, ImmutableMap.<String,
Object>of("HELLO", "WORLD")));
ContainerEffector initializer = new ContainerEffector(parameters);
@@ -58,7 +59,7 @@ public class ContainerEffectorTest extends
BrooklynAppUnitTestSupport {
EntityAsserts.assertAttributeEqualsEventually(parentEntity,
Attributes.SERVICE_UP, true);
Object output = Entities.invokeEffector(app, parentEntity,
parentEntity.getEffector("test-container-effector")).getUnchecked(Duration.ONE_MINUTE);
- assertTrue(output.toString().contains(message));
+ assertTrue(output.toString().contains(message+" WORLD"));
}
@Test
diff --git
a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
index 64a2f5f1c7..5de54807d3 100644
---
a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
+++
b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
@@ -41,7 +41,7 @@ public class ContainerSensorTest extends
BrooklynAppUnitTestSupport {
ConfigBag parameters = ConfigBag.newInstance(ImmutableMap.of(
ContainerCommons.CONTAINER_IMAGE, "perl",
ContainerCommons.CONTAINER_IMAGE_PULL_POLICY,
PullPolicy.IF_NOT_PRESENT,
- ContainerCommons.COMMANDS, ImmutableList.of("/bin/bash",
"-c","echo " + message) ,
+ ContainerCommons.COMMAND, ImmutableList.of("/bin/bash",
"-c","echo " + message) ,
ContainerSensor.SENSOR_PERIOD, "1s",
ContainerSensor.SENSOR_NAME, "test-echo-sensor"));
@@ -59,7 +59,7 @@ public class ContainerSensorTest extends
BrooklynAppUnitTestSupport {
ConfigBag parameters = ConfigBag.newInstance(ImmutableMap.of(
ContainerCommons.CONTAINER_IMAGE, "perl",
- ContainerCommons.COMMANDS, ImmutableList.of("/bin/bash") ,
+ ContainerCommons.COMMAND, ImmutableList.of("/bin/bash") ,
ContainerCommons.ARGUMENTS, ImmutableList.of("-c", "echo " +
message) ,
ContainerSensor.SENSOR_PERIOD, "1s",
ContainerSensor.SENSOR_NAME, "test-echo-sensor"));
@@ -112,7 +112,7 @@ public class ContainerSensorTest extends
BrooklynAppUnitTestSupport {
public void testTfVersionSensor() {
ConfigBag parameters = ConfigBag.newInstance(ImmutableMap.of(
ContainerCommons.CONTAINER_IMAGE,
"hashicorp/terraform:1.3.0-alpha20220622",
- ContainerCommons.COMMANDS, ImmutableList.of("terraform",
"version" ),
+ ContainerCommons.COMMAND, ImmutableList.of("terraform",
"version" ),
ContainerSensor.SENSOR_PERIOD, "1s",
ContainerSensor.SENSOR_NAME, "tf-version-sensor"));
diff --git
a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java
b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java
index 36fe0acc08..4dda230118 100644
---
a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java
+++
b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java
@@ -19,27 +19,21 @@
package org.apache.brooklyn.tasks.kubectl;
import com.beust.jcommander.internal.Maps;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.mgmt.HasTaskChildren;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.test.Asserts;
-import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.time.Duration;
import org.testng.annotations.Test;
import java.util.HashMap;
-import java.util.List;
-
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import static org.testng.AssertJUnit.assertTrue;
@@ -53,107 +47,181 @@ public class ContainerTaskTest extends
BrooklynAppUnitTestSupport {
public void testSuccessfulContainerTask() {
TestEntity entity =
app.createAndManageChild(EntitySpec.create(TestEntity.class));
- Map<String,Object> configBag = new HashMap<>();
- configBag.put("name", "test-container-task");
- configBag.put("image", "perl");
- configBag.put("commands", Lists.newArrayList("/bin/bash", "-c","echo
'hello test'"));
- configBag.put("imagePullPolicy", PullPolicy.IF_NOT_PRESENT);
-
- Task<String> containerTask = new
ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new
ContainerTaskFactory.ConcreteContainerTaskFactory()
.summary("Running container task")
- .configure(configBag)
+ .jobIdentifier("test-container-task")
+ .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+ .timeout(Duration.TWO_MINUTES)
+ .image("perl")
+ .command( "/bin/bash", "-c","echo 'hello test'" )
.newTask();
+
DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
- Object result = containerTask.getUnchecked(Duration.of(5,
TimeUnit.MINUTES));
- List<String> res = (List<String>) result;
- while(!res.isEmpty() && Iterables.getLast(res).matches("namespace .*
deleted\\s*")) res = res.subList(0, res.size()-1);
+ ContainerTaskFactory.ContainerTaskResult result =
containerTask.getUnchecked(Duration.ONE_MINUTE);
+ Asserts.assertEquals(result.getMainStdout().trim(), "hello test");
+ }
+
+ @Test
+ public void testContainerTaskWithVar() {
+ TestEntity entity =
app.createAndManageChild(EntitySpec.create(TestEntity.class));
+
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new
ContainerTaskFactory.ConcreteContainerTaskFactory()
+ .summary("Running container task")
+ .jobIdentifier("test-container-task")
+ .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+ .timeout(Duration.TWO_MINUTES)
+ .image("perl")
+ .env("test_name", "EnvTest")
+ .bashScriptCommands("echo hello ${test_name}" )
+ .newTask();
- String res2 = res.isEmpty() ? null : Iterables.getLast(res);
- assertTrue(res2.startsWith("hello test"));
+ DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+ ContainerTaskFactory.ContainerTaskResult result =
containerTask.getUnchecked(Duration.ONE_MINUTE);
+ Asserts.assertEquals(result.getMainStdout().trim(), "hello EnvTest");
+ Asserts.assertEquals(BrooklynTaskTags.stream(containerTask,
BrooklynTaskTags.STREAM_ENV).streamContents.get().trim(),
"test_name=\"EnvTest\"");
}
@Test
public void testSuccessfulContainerTerraformTask() {
TestEntity entity =
app.createAndManageChild(EntitySpec.create(TestEntity.class));
- Map<String,Object> configBag = new HashMap<>();
- configBag.put("name", "test-container-task");
- configBag.put("image", "hashicorp/terraform:latest");
- configBag.put("commands", ImmutableList.of("terraform", "version" ));
- configBag.put("imagePullPolicy", PullPolicy.IF_NOT_PRESENT);
-
- Task<String> containerTask = new
ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new
ContainerTaskFactory.ConcreteContainerTaskFactory()
.summary("Running terraform-container task")
- .configure(configBag)
+ .jobIdentifier("test-container-task")
+ .timeout(Duration.TWO_MINUTES)
+ .image("hashicorp/terraform:latest")
+ .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+ .command( "terraform", "version" )
.newTask();
DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
- Object result = containerTask.getUnchecked(Duration.of(5,
TimeUnit.MINUTES));
- List<String> res = (List<String>) result;
- while(!res.isEmpty() && Iterables.getLast(res).matches("namespace .*
deleted\\s*")) res = res.subList(0, res.size()-1);
- String res2 = res.isEmpty() ? null : Iterables.getLast(res);
- assertTrue(res2.startsWith("Terraform"));
+ ContainerTaskFactory.ContainerTaskResult result =
containerTask.getUnchecked();
+ assertTrue(result.getMainStdout().startsWith("Terraform"));
}
- @Test// tries to execute local command, wants it to fail, but even so best
as integration
- public void testFailingContainerTask() {
+ @Test // execute local command, assert we get exit code, and it fails
+ public void testExpectedFailingContainerTask() {
TestEntity entity =
app.createAndManageChild(EntitySpec.create(TestEntity.class));
- List<String> commands = MutableList.of("/bin/bash", "-c","echo 'hello
test' & exit 1");
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new
ContainerTaskFactory.ConcreteContainerTaskFactory()
+ .summary("Running container task")
+ .jobIdentifier("test-container-task")
+ .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+ .timeout(Duration.TWO_MINUTES)
+ .image("perl")
+ .command( "/bin/bash", "-c","echo 'hello test' && exit 42" )
+ .allowNonZeroExitCode()
+ .newTask();
+ DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+
+
Asserts.assertTrue(containerTask.blockUntilEnded(Duration.ONE_MINUTE)); //
should complete in 1 minute, i.e. we detect the failed
+ Asserts.assertTrue(containerTask.isDone());
+
Asserts.assertEquals((int)containerTask.getUnchecked().getMainExitCode(), 42);
+
Asserts.assertEquals(containerTask.getUnchecked().getMainStdout().trim(),
"hello test");
+ }
- Map<String,Object> configBag = new HashMap<>();
- configBag.put("name", "test-container-task");
- configBag.put("image", "perl");
- configBag.put("commands", commands);
- configBag.put("timeout", "1m");
-
- Task<String> containerTask = new
ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
- .summary("Running docker task")
- .configure(configBag)
+
+ @Test // execute local command, assert we get exit code, and it fails
+ public void testSleepingAndExpectedFailingContainerTask() {
+ TestEntity entity =
app.createAndManageChild(EntitySpec.create(TestEntity.class));
+
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new
ContainerTaskFactory.ConcreteContainerTaskFactory()
+ .summary("Running container task")
+ .jobIdentifier("test-container-task")
+ .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+ .timeout(Duration.TWO_MINUTES)
+ .image("perl")
+ .bashScriptCommands("echo starting", "sleep 6", "echo done",
"exit 42", "echo ignored")
+ .allowNonZeroExitCode()
.newTask();
- try {
-
DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity).getTask().get();
- if (containerTask instanceof HasTaskChildren) {
- for (Task<?> child:
((HasTaskChildren)containerTask).getChildren()) {
-
if(child.getTags().contains(BrooklynTaskTags.INESSENTIAL_TASK) &&
child.isError()) {
- child.get();
- }
- }
- }
- } catch (Exception e) {
- Asserts.expectedFailureContains(e, "Process task ended with exit
code", "when 0 was required");
- }
+ DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+
+ Asserts.eventually(() -> {
+ BrooklynTaskTags.WrappedStream stream =
BrooklynTaskTags.stream(containerTask, BrooklynTaskTags.STREAM_STDOUT);
+ if (stream==null) return null;
+ return stream.streamContents.get().trim();
+ }, x -> x.equals("starting"));
+ Asserts.assertFalse(containerTask.isDone());
+
+ // may be occasional timing glitches here, eg if namespace cleanup
takes > 3s
+ Stopwatch sw = Stopwatch.createStarted();
+ // should complete in under 10s, i.e. we detect the failed in less
than 5s after
+
Asserts.assertTrue(containerTask.blockUntilEnded(Duration.seconds(15)), "should
definitely finish within 15s of starting");
+ Asserts.assertThat(Duration.of(sw.elapsed()), dur ->
dur.isShorterThan(Duration.seconds(10)));
+
+ Asserts.assertTrue(containerTask.isDone());
+ Asserts.assertEquals(BrooklynTaskTags.stream(containerTask,
BrooklynTaskTags.STREAM_STDOUT).streamContents.get().trim(), "starting\ndone");
+
Asserts.assertEquals((int)containerTask.getUnchecked().getMainExitCode(), 42);
+
Asserts.assertEquals(containerTask.getUnchecked().getMainStdout().trim(),
"starting\ndone");
+ }
+
+ @Test // execute local command, assert fails if exit code not allowed to
be zero
+ public void testNotExpectedFailingContainerTask() {
+ TestEntity entity =
app.createAndManageChild(EntitySpec.create(TestEntity.class));
+
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new
ContainerTaskFactory.ConcreteContainerTaskFactory()
+ .summary("Running container task")
+ .jobIdentifier("test-container-task")
+ .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+ .timeout(Duration.TWO_MINUTES)
+ .image("perl")
+ .command( "/bin/bash", "-c","echo 'hello test' && exit 42" )
+// .allowNonZeroExitCode()
+ .newTask();
+ DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+
+
Asserts.assertTrue(containerTask.blockUntilEnded(Duration.ONE_MINUTE)); //
should complete in 1 minute, i.e. we detect the failed
+ Asserts.assertTrue(containerTask.isDone());
+ Asserts.assertTrue(containerTask.isError());
+ Asserts.assertFailsWith(() -> containerTask.getUnchecked(), error ->
Asserts.expectedFailureContainsIgnoreCase(error, "Non-zero", "42"));
}
@Test
public void testScriptContainerTask() {
TestEntity entity =
app.createAndManageChild(EntitySpec.create(TestEntity.class));
Map<String,Object> volumes = Maps.newHashMap();
- volumes.put("name", "tf-ws");
- volumes.put("hostPath", Maps.newHashMap("path", "/tfws"));
-
- List<String> commands = MutableList.of("./hello.sh");
+ String volumeId = "brooklyn-container-task-test-volume";
+ String uid = Identifiers.makeRandomId(8).toLowerCase();
+ volumes.put("name", volumeId);
+ // sometimes this can be mounted from local drive, but does not always
need to be
+ volumes.put("hostPath", Maps.newHashMap("path",
"/tmp/brooklyn-container-task-test-shared"));
Map<String,Object> configBag = new HashMap<>();
- configBag.put("name", "test-container-task");
- configBag.put("image", "hhwang927/ubuntu_base");
- configBag.put("imagePullPolicy", "never");
- configBag.put("commands", commands);
- configBag.put("workingDir", "/tfws/scripts");
+ configBag.put("workingDir", "/brooklyn-mount-dir/scripts");
configBag.put("volumes", Sets.newHashSet(volumes));
- configBag.put("volumeMounts", Sets.newHashSet(Maps.newHashMap("name",
"tf-ws", "mountPath", "/tfws")));
+ configBag.put("volumeMounts", Sets.newHashSet(Maps.newHashMap("name",
volumeId, "mountPath", "/brooklyn-mount-dir")));
- Task<String> containerTask = new
ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
- .summary("Running docker task")
- .configure(configBag)
- .newTask();
- DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
- Object result = containerTask.getUnchecked(Duration.of(5,
TimeUnit.MINUTES));
- List<String> res = (List<String>) result;
- while(!res.isEmpty() && Iterables.getLast(res).matches("namespace .*
deleted\\s*")) res = res.subList(0, res.size()-1);
+ ContainerTaskFactory.ConcreteContainerTaskFactory baseFactory = new
ContainerTaskFactory.ConcreteContainerTaskFactory()
+ .summary("Running container task")
+ .jobIdentifier("test-container-task")
+ .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+ .timeout(Duration.TWO_MINUTES)
+ .image("hhwang927/ubuntu_base")
+ .useNamespace("brooklyn-container-test-"+uid, null, false)
+ .configure(configBag);
- String res2 = res.isEmpty() ? null : Iterables.getLast(res);
- assertTrue(res2.contains("hello"));
+ try {
+ // first create the script
+ Task<ContainerTaskFactory.ContainerTaskResult> setup =
baseFactory.bashScriptCommands(
+ "mkdir -p /brooklyn-mount-dir/scripts",
+ "cd /brooklyn-mount-dir/scripts",
+ "echo echo hello " + uid + " > hello-"+uid+".sh",
+ "chmod +x hello-"+uid+".sh"
+ ).newTask();
+
DynamicTasks.queueIfPossible(setup).orSubmitAsync(entity).getTask().getUnchecked();
+
+ // now make a new container that should see the same mount point,
and try running the command
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask =
baseFactory.bashScriptCommands(
+ "./hello-"+uid+".sh"
+ )
+ .newTask();
+ DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+ ContainerTaskFactory.ContainerTaskResult result =
containerTask.getUnchecked(Duration.ONE_MINUTE);
+ Asserts.assertEquals(result.getMainStdout().trim(), "hello " +
uid);
+
+ } finally {
+ DynamicTasks.queueIfPossible( baseFactory.summary("cleaning
up").deleteNamespace(true).bashScriptCommands("rm hello-"+uid+".sh")
).orSubmitAsync(entity);
+ }
}
}
diff --git
a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/JobBuilderTest.java
b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/KubeJobSpecCreatorTest.java
similarity index 74%
rename from
software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/JobBuilderTest.java
rename to
software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/KubeJobSpecCreatorTest.java
index 1522271150..fca932cd95 100644
---
a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/JobBuilderTest.java
+++
b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/KubeJobSpecCreatorTest.java
@@ -21,35 +21,36 @@ package org.apache.brooklyn.tasks.kubectl;
import com.beust.jcommander.internal.Maps;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.apache.brooklyn.core.mgmt.ha.BrooklynBomOsgiArchiveInstaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
+import java.io.File;
import java.nio.file.Files;
-import java.nio.file.Paths;
import java.util.Map;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-public class JobBuilderTest {
- private static final Logger LOG =
LoggerFactory.getLogger(JobBuilderTest.class);
+public class KubeJobSpecCreatorTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(KubeJobSpecCreatorTest.class);
@Test
public void testPerlWithArgs() throws Exception{
- String yamlJobLocation =
- new JobBuilder().withImage("perl").withName("perl-args-test")
+ BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> yamlJobLocation
=
+ new
KubeJobFileCreator().withImage("perl").withName("perl-args-test")
.withArgs(Lists.newArrayList( "echo", "aaa"))
.withImagePullPolicy(PullPolicy.ALWAYS) // explicit
"Always"
- .build();
+ .createFile();
assertNotNull(yamlJobLocation);
- String actual = String.join("\n",
Files.readAllLines(Paths.get(yamlJobLocation)));
+ String actual = String.join("\n",
Files.readAllLines(yamlJobLocation.getFile().toPath()));
String expected = "apiVersion: batch/v1\n" +
"kind: Job\n" +
"metadata:\n" +
" name: perl-args-test\n" +
"spec:\n" +
- " backoffLimit: 1\n" +
+ " backoffLimit: 0\n" +
" completions: 1\n" +
" parallelism: 1\n" +
" template:\n" +
@@ -68,20 +69,20 @@ public class JobBuilderTest {
@Test
public void testPerlWithArgsAndCommand() throws Exception{
- String yamlJobLocation =
- new
JobBuilder().withImage("perl").withName("perl-args-and-command-test")
- .withCommands(Lists.newArrayList("/bin/bash"))
+ BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> yamlJobLocation
=
+ new
KubeJobFileCreator().withImage("perl").withName("perl-args-and-command-test")
+ .withCommand(Lists.newArrayList("/bin/bash"))
.withArgs(Lists.newArrayList("-c", "echo aaa"))
.withImagePullPolicy(PullPolicy.NEVER)
- .build();
+ .createFile();
assertNotNull(yamlJobLocation);
- String actual = String.join("\n",
Files.readAllLines(Paths.get(yamlJobLocation)));
+ String actual = String.join("\n",
Files.readAllLines(yamlJobLocation.getFile().toPath()));
String expected = "apiVersion: batch/v1\n" +
"kind: Job\n" +
"metadata:\n" +
" name: perl-args-and-command-test\n" +
"spec:\n" +
- " backoffLimit: 1\n" +
+ " backoffLimit: 0\n" +
" completions: 1\n" +
" parallelism: 1\n" +
" template:\n" +
@@ -102,19 +103,19 @@ public class JobBuilderTest {
@Test
public void testPerlCommand() throws Exception{
- String yamlJobLocation =
- new
JobBuilder().withImage("perl").withName("perl-command-test")
- .withCommands(Lists.newArrayList("/bin/bash", "-c",
"echo aaa"))
+ BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> yamlJobLocation
=
+ new
KubeJobFileCreator().withImage("perl").withName("perl-command-test")
+ .withCommand(Lists.newArrayList("/bin/bash", "-c",
"echo aaa"))
.withImagePullPolicy(PullPolicy.IF_NOT_PRESENT)
- .build();
+ .createFile();
assertNotNull(yamlJobLocation);
- String actual = String.join("\n",
Files.readAllLines(Paths.get(yamlJobLocation)));
+ String actual = String.join("\n",
Files.readAllLines(yamlJobLocation.getFile().toPath()));
String expected = "apiVersion: batch/v1\n" +
"kind: Job\n" +
"metadata:\n" +
" name: perl-command-test\n" +
"spec:\n" +
- " backoffLimit: 1\n" +
+ " backoffLimit: 0\n" +
" completions: 1\n" +
" parallelism: 1\n" +
" template:\n" +
@@ -137,21 +138,20 @@ public class JobBuilderTest {
Map<String,Object> volumes = Maps.newHashMap();
volumes.put("name", "tf-ws");
volumes.put("hostPath", Maps.newHashMap("path", "/tfws"));
- String yamlJobLocation =
- new
JobBuilder().withImage("hashicorp/terraform").withName("tf-version")
- .withVolumes(Sets.newHashSet(volumes))
-
.withVolumeMounts(Sets.newHashSet(Maps.newHashMap("name", "tf-ws", "mountPath",
"/tfws")))
- .withCommands(Lists.newArrayList("terraform",
"version"))
- .withWorkingDir("/tfws/app1")
- .build();
+ BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> yamlJobLocation
= new
KubeJobFileCreator().withImage("hashicorp/terraform").withName("tf-version")
+ .withVolumes(Sets.newHashSet(volumes))
+ .withVolumeMounts(Sets.newHashSet(Maps.newHashMap("name",
"tf-ws", "mountPath", "/tfws")))
+ .withCommand(Lists.newArrayList("terraform", "version"))
+ .withWorkingDir("/tfws/app1")
+ .createFile();
assertNotNull(yamlJobLocation);
- String actual = String.join("\n",
Files.readAllLines(Paths.get(yamlJobLocation)));
+ String actual = String.join("\n",
Files.readAllLines(yamlJobLocation.getFile().toPath()));
String expected = "apiVersion: batch/v1\n" +
"kind: Job\n" +
"metadata:\n" +
" name: tf-version\n" +
"spec:\n" +
- " backoffLimit: 1\n" +
+ " backoffLimit: 0\n" +
" completions: 1\n" +
" parallelism: 1\n" +
" template:\n" +