This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 3259cacca18c288141952fb7840cbeeb984f89bc Author: Alex Heneveld <[email protected]> AuthorDate: Wed Aug 24 13:00:42 2022 +0100 improve logic for configuring commandUrl to ssh-command-sensor previous implementation (a) wouldn't install if the target didn't have an install dir, and (b) wouldn't work if install dir wasn't set until after a rebind (because subscriptions aren't persisted; the logic for setting up needs to be part of the feed) --- .../brooklyn/core/sensor/ssh/SshCommandSensor.java | 55 ++++++++-------------- .../apache/brooklyn/feed/AbstractCommandFeed.java | 31 ++++++++++-- .../java/org/apache/brooklyn/feed/ssh/SshFeed.java | 51 ++++++++++++++++++++ .../org/apache/brooklyn/feed/windows/CmdFeed.java | 7 +++ 4 files changed, 106 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java index 3632257361..551a14927a 100644 --- a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java @@ -31,6 +31,8 @@ import org.apache.brooklyn.api.entity.EntityInitializer; import org.apache.brooklyn.api.entity.EntityLocal; import org.apache.brooklyn.api.mgmt.TaskFactory; import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.config.MapConfigKey; @@ -69,6 +71,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; /** * Configurable {@link EntityInitializer} which adds an SSH sensor feed running the <code>command</code> supplied @@ -86,7 +89,7 @@ public final class SshCommandSensor<T> extends AbstractAddTriggerableSensor<T> { public static final ConfigKey<String> SENSOR_COMMAND_URL = ConfigKeys.newStringConfigKey("commandUrl", "Remote SSH command to execute for sensor (takes precedence over command)"); public static final ConfigKey<String> SENSOR_EXECUTION_DIR = ConfigKeys.newStringConfigKey("executionDir", "Directory where the command should run; " + "if not supplied, executes in the entity's run dir (or home dir if no run dir is defined); " - + "use '~' to always execute in the home dir, or 'custom-feed/' to execute in a custom-feed dir relative to the run dir"); + + "use '~' to always execute in the home dir, or 'custom-feed/' to execute in a custom-feed dir relative to the run dir; not compatible with commandUrl"); public static final ConfigKey<Object> VALUE_ON_ERROR = ConfigKeys.newConfigKey(Object.class, "value.on.error", "Value to be used if an error occurs whilst executing the ssh command", null); public static final MapConfigKey<Object> SENSOR_SHELL_ENVIRONMENT = BrooklynConfigKeys.SHELL_ENVIRONMENT; @@ -109,36 +112,9 @@ public final class SshCommandSensor<T> extends AbstractAddTriggerableSensor<T> { public void apply(final EntityLocal entity) { ConfigBag params = initParams(); - String commandUrl = EntityInitializers.resolve(initParams(), SENSOR_COMMAND_URL); - if (Objects.nonNull(commandUrl)) { - - entity.subscriptions().subscribe(entity, BrooklynConfigKeys.INSTALL_DIR, booleanSensorEvent -> { - if (Strings.isNonBlank(booleanSensorEvent.getValue()) && !commandUrlInstalled.get()) { - - // Prepare path for a remote command script. - // Take into account possibility of multiple ssh commands initialized at the same entity. - String commandUrlPath = booleanSensorEvent.getValue() + "/command-url-" + Identifiers.makeRandomId(4)+ ".sh"; - - // Look for SshMachineLocation and install remote command script. - Maybe<SshMachineLocation> locationMaybe = Locations.findUniqueSshMachineLocation(entity.getLocations()); - if (locationMaybe.isPresent()) { - TaskFactory<?> install = SshTasks.installFromUrl(locationMaybe.get(), commandUrl, commandUrlPath); - Object ret = DynamicTasks.queueIfPossible(install.newTask()).orSubmitAsync(entity).andWaitForSuccess(); - - // Prevent command duplicates in case if INSTALL_DIR changed from the outside. - commandUrlInstalled.set(true); - } else { - throw new IllegalStateException("Could not find SshMachineLocation to run 'commandUrl'"); - } - - // Run a deferred command. - params.putStringKey(SENSOR_COMMAND.getName(), "bash " + commandUrlPath); - apply(entity, params); - } - }); - } else { - apply(entity, params); - } + // previously if a commandUrl was used we would listen for the install dir to be set; but that doesn't survive rebind; + // now we install on first run as part of the SshFeed + apply(entity, params); } private void apply(final EntityLocal entity, final ConfigBag params) { @@ -152,7 +128,7 @@ public final class SshCommandSensor<T> extends AbstractAddTriggerableSensor<T> { } Supplier<Map<String,String>> envSupplier = new EnvSupplier(entity, params); - Supplier<String> commandSupplier = new CommandSupplier(entity, params); + CommandSupplier commandSupplier = new CommandSupplier(entity, params); CommandPollConfig<T> pollConfig = new CommandPollConfig<T>(sensor) .env(envSupplier) @@ -163,12 +139,21 @@ public final class SshCommandSensor<T> extends AbstractAddTriggerableSensor<T> { standardPollConfig(entity, initParams(), pollConfig); - SshFeed feed = SshFeed.builder() + SshFeed.Builder feedBuilder = SshFeed.builder() .entity(entity) .onlyIfServiceUp(Maybe.ofDisallowingNull(EntityInitializers.resolve(params, ONLY_IF_SERVICE_UP)).or(true)) - .poll(pollConfig) - .build(); + .poll(pollConfig); + + String commandUrl = EntityInitializers.resolve(initParams(), SENSOR_COMMAND_URL); + if (commandUrl!=null) { + feedBuilder.commandUrlToInstallAndRun(commandUrl); + // commandSupplier above will be ignored + if (commandSupplier.rawSensorCommand!=null || commandSupplier.rawSensorExecDir!=null) { + throw new IllegalArgumentException("commandUrl is not compatible with command or executionDir"); + } + } + SshFeed feed = feedBuilder.build(); entity.addFeed(feed); // Deprecated; kept for backwards compatibility with historic persisted state diff --git a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java index 5a9921d166..75d8f71443 100644 --- a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java +++ b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java @@ -30,8 +30,10 @@ import java.util.concurrent.TimeUnit; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntityLocal; import org.apache.brooklyn.api.location.MachineLocation; +import org.apache.brooklyn.api.mgmt.TaskFactory; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.entity.BrooklynConfigKeys; import org.apache.brooklyn.core.feed.AbstractFeed; import org.apache.brooklyn.core.feed.AttributePollHandler; import org.apache.brooklyn.core.feed.DelegatingPollHandler; @@ -40,6 +42,12 @@ import org.apache.brooklyn.core.location.Locations; import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor; import org.apache.brooklyn.feed.function.FunctionFeed; import org.apache.brooklyn.feed.ssh.SshPollValue; +import org.apache.brooklyn.location.ssh.SshMachineLocation; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.ssh.SshTasks; +import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.util.os.Os; +import org.apache.brooklyn.util.text.Identifiers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.brooklyn.util.time.Duration; @@ -95,7 +103,10 @@ public abstract class AbstractCommandFeed extends AbstractFeed { "machine"); public static final ConfigKey<Boolean> EXEC_AS_COMMAND = ConfigKeys.newBooleanConfigKey("execAsCommand"); - + public static final ConfigKey<String> COMMAND_URL = ConfigKeys.newStringConfigKey("commandUrl"); + + protected static final ConfigKey<String> COMMAND_URL_COPIED_AS = ConfigKeys.newStringConfigKey("commandUrlCopiedAs"); + @SuppressWarnings("serial") public static final ConfigKey<SetMultimap<CommandPollIdentifier, CommandPollConfig<?>>> POLLS = ConfigKeys.newConfigKey( new TypeToken<SetMultimap<CommandPollIdentifier, CommandPollConfig<?>>>() {}, @@ -108,6 +119,7 @@ public abstract class AbstractCommandFeed extends AbstractFeed { private Duration period = Duration.of(500, TimeUnit.MILLISECONDS); private boolean execAsCommand = false; private String uniqueTag; + private String commandUrlToInstallAndRun; private volatile boolean built; public B entity(Entity val) { @@ -139,6 +151,11 @@ public abstract class AbstractCommandFeed extends AbstractFeed { public abstract B poll(CommandPollConfig<?> config); public abstract List<CommandPollConfig<?>> getPolls(); + public B commandUrlToInstallAndRun(String commandUrl) { + this.commandUrlToInstallAndRun = commandUrl; + return self(); + } + public B execAsCommand() { execAsCommand = true; return self(); @@ -202,7 +219,8 @@ public abstract class AbstractCommandFeed extends AbstractFeed { config().set(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp); config().set(MACHINE, builder.machine); config().set(EXEC_AS_COMMAND, builder.execAsCommand); - + config().set(COMMAND_URL, builder.commandUrlToInstallAndRun); + SetMultimap<CommandPollIdentifier, CommandPollConfig<?>> polls = HashMultimap.<CommandPollIdentifier,CommandPollConfig<?>>create(); for (CommandPollConfig<?> config : (List<CommandPollConfig<?>>)builder.getPolls()) { @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -225,7 +243,11 @@ public abstract class AbstractCommandFeed extends AbstractFeed { @Override protected void preStart() { - getPoller().scheduleFeed(this, getConfig(POLLS), pollInfo -> () -> exec(pollInfo.command.get(), pollInfo.env.get())); + if (config().get(COMMAND_URL)!=null) { + getPoller().scheduleFeed(this, getConfig(POLLS), pollInfo -> () -> installAndExec(config().get(COMMAND_URL), pollInfo.env.get())); + } else { + getPoller().scheduleFeed(this, getConfig(POLLS), pollInfo -> () -> exec(pollInfo.command.get(), pollInfo.env.get())); + } } @Override @@ -235,4 +257,7 @@ public abstract class AbstractCommandFeed extends AbstractFeed { } protected abstract SshPollValue exec(String command, Map<String,String> env) throws IOException; + + protected abstract SshPollValue installAndExec(String commandUrl, Map<String,String> env) throws IOException; + } diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java index a17e0707f6..0f4642643c 100644 --- a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java +++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java @@ -21,15 +21,23 @@ package org.apache.brooklyn.feed.ssh; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.brooklyn.api.mgmt.TaskAdaptable; +import org.apache.brooklyn.api.mgmt.TaskFactory; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.entity.BrooklynConfigKeys; +import org.apache.brooklyn.core.location.Locations; import org.apache.brooklyn.feed.CommandPollConfig; import org.apache.brooklyn.location.ssh.SshMachineLocation; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.internal.ssh.SshTool; import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.ssh.SshTasks; import org.apache.brooklyn.util.core.task.ssh.internal.PlainSshExecTaskFactory; import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory; import org.apache.brooklyn.util.core.task.system.ProcessTaskStub.ScriptReturnType; import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper; +import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.util.os.Os; +import org.apache.brooklyn.util.text.Identifiers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,4 +147,47 @@ public class SshFeed extends org.apache.brooklyn.feed.AbstractCommandFeed { return new SshPollValue(machine, task.getExitCode(), task.getStdout(), task.getStderr()); } + protected SshPollValue installAndExec(String commandUrl, Map<String,String> env) throws IOException { + String commandUrlCopiedAs = config().get(COMMAND_URL_COPIED_AS); + if (commandUrlCopiedAs==null) { + synchronized (this) { + commandUrlCopiedAs = config().get(COMMAND_URL_COPIED_AS); + if (commandUrlCopiedAs==null) { + String installDir = getEntity().sensors().get(BrooklynConfigKeys.INSTALL_DIR); + if (installDir == null) { + commandUrlCopiedAs = "brooklyn-ssh-command-url-" + entity.getApplicationId() + "-" + entity.getId() + "-" + Identifiers.makeRandomId(4) + ".sh"; + log.debug("Install dir not available at " + getEntity() + "; will use default/home directory for "+this+", in "+commandUrlCopiedAs); + } else { + commandUrlCopiedAs = Os.mergePathsUnix(installDir, "command-url-" + Identifiers.makeRandomId(4) + ".sh"); + } + + // Look for SshMachineLocation and install remote command script. + Maybe<SshMachineLocation> locationMaybe = Locations.findUniqueSshMachineLocation(entity.getLocations()); + if (locationMaybe.isPresent()) { + TaskFactory<?> install = SshTasks.installFromUrl(locationMaybe.get(), commandUrl, commandUrlCopiedAs); + DynamicTasks.queueIfPossible(install.newTask()).orSubmitAsync(entity).andWaitForSuccess(); + log.debug("Installed from "+commandUrl+" to "+commandUrlCopiedAs+" at "+getEntity()); + } else { + throw new IllegalStateException("Ssh machine location not available at " + getEntity() + "; skipping run of " + this); + } + + config().set(COMMAND_URL_COPIED_AS, commandUrlCopiedAs); + } + } + } + return exec("bash "+commandUrlCopiedAs, env); + } + + protected <T> void doReconfigureConfig(ConfigKey<T> key, T val) { + if (key.getName().equals(COMMAND_URL.getName())) { + config().set(COMMAND_URL_COPIED_AS, (String)null); + return; + } + if (key.getName().equals(COMMAND_URL_COPIED_AS.getName())) { + // allowed + return; + } + + super.doReconfigureConfig(key, val); + } } diff --git a/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java index 4cf3afbed0..209b8e94dc 100644 --- a/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java +++ b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java @@ -86,4 +86,11 @@ public class CmdFeed extends AbstractCommandFeed { return new SshPollValue(null, exitStatus, winRmToolResponse.getStdOut(), winRmToolResponse.getStdErr()); } + + @Override + protected SshPollValue installAndExec(String commandUrl, Map<String, String> env) throws IOException { + // TODO + throw new IllegalStateException("commandUrl not supported for WinRM cmds"); + } + }
