http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineAttributes.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineAttributes.java b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineAttributes.java new file mode 100644 index 0000000..a9c2d22 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineAttributes.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.machine; + +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.config.render.RendererHints; +import org.apache.brooklyn.sensor.core.Sensors; +import org.apache.brooklyn.util.guava.Functionals; +import org.apache.brooklyn.util.math.MathFunctions; +import org.apache.brooklyn.util.text.ByteSizeStrings; +import org.apache.brooklyn.util.time.Duration; + +import com.google.common.base.Function; + +public class MachineAttributes { + + /** + * Do not instantiate. + */ + private MachineAttributes() {} + + /* + * Sensor attributes for machines. + */ + + public static final AttributeSensor<Duration> UPTIME = Sensors.newSensor(Duration.class, "machine.uptime", "Current uptime"); + public static final AttributeSensor<Double> LOAD_AVERAGE = Sensors.newDoubleSensor("machine.loadAverage", "Current load average"); + + public static final AttributeSensor<Double> CPU_USAGE = Sensors.newDoubleSensor("machine.cpu", "Current CPU usage"); + public static final AttributeSensor<Double> AVERAGE_CPU_USAGE = Sensors.newDoubleSensor("cpu.average", "Average CPU usage across the cluster"); + + public static final AttributeSensor<Long> FREE_MEMORY = Sensors.newLongSensor("machine.memory.free", "Current free memory"); + public static final AttributeSensor<Long> TOTAL_MEMORY = Sensors.newLongSensor("machine.memory.total", "Total memory"); + public static final AttributeSensor<Long> USED_MEMORY = Sensors.newLongSensor("machine.memory.used", "Current memory usage"); + public static final AttributeSensor<Double> USED_MEMORY_DELTA_PER_SECOND_LAST = Sensors.newDoubleSensor("memory.used.delta", "Change in memory usage per second"); + public static final AttributeSensor<Double> USED_MEMORY_DELTA_PER_SECOND_IN_WINDOW = Sensors.newDoubleSensor("memory.used.windowed", "Average change in memory usage over 30s"); + + private static AtomicBoolean initialized = new AtomicBoolean(false); + + /** + * Setup renderer hints. + */ + public static void init() { + if (initialized.getAndSet(true)) return; + + final Function<Double, Long> longValue = new Function<Double, Long>() { + @Override + public Long apply(@Nullable Double input) { + if (input == null) return null; + return input.longValue(); + } + }; + + RendererHints.register(CPU_USAGE, RendererHints.displayValue(MathFunctions.percent(2))); + RendererHints.register(AVERAGE_CPU_USAGE, RendererHints.displayValue(MathFunctions.percent(2))); + + RendererHints.register(FREE_MEMORY, RendererHints.displayValue(Functionals.chain(MathFunctions.times(1000L), ByteSizeStrings.metric()))); + RendererHints.register(TOTAL_MEMORY, RendererHints.displayValue(Functionals.chain(MathFunctions.times(1000L), ByteSizeStrings.metric()))); + RendererHints.register(USED_MEMORY, RendererHints.displayValue(Functionals.chain(MathFunctions.times(1000L), ByteSizeStrings.metric()))); + RendererHints.register(USED_MEMORY_DELTA_PER_SECOND_LAST, RendererHints.displayValue(Functionals.chain(longValue, ByteSizeStrings.metric()))); + RendererHints.register(USED_MEMORY_DELTA_PER_SECOND_IN_WINDOW, RendererHints.displayValue(Functionals.chain(longValue, ByteSizeStrings.metric()))); + } + + static { + init(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntity.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntity.java b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntity.java new file mode 100644 index 0000000..e24222a --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntity.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.machine; + +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.entity.ImplementedBy; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.effector.core.MethodEffector; +import org.apache.brooklyn.entity.annotation.Effector; +import org.apache.brooklyn.entity.annotation.EffectorParam; +import org.apache.brooklyn.entity.software.base.EmptySoftwareProcess; +import org.apache.brooklyn.util.time.Duration; + +@Catalog(name="Machine Entity", description="Represents a machine, providing metrics about it (e.g. obtained from ssh)") +@ImplementedBy(MachineEntityImpl.class) +public interface MachineEntity extends EmptySoftwareProcess { + + AttributeSensor<Duration> UPTIME = MachineAttributes.UPTIME; + AttributeSensor<Double> LOAD_AVERAGE = MachineAttributes.LOAD_AVERAGE; + AttributeSensor<Double> CPU_USAGE = MachineAttributes.CPU_USAGE; + AttributeSensor<Long> FREE_MEMORY = MachineAttributes.FREE_MEMORY; + AttributeSensor<Long> TOTAL_MEMORY = MachineAttributes.TOTAL_MEMORY; + AttributeSensor<Long> USED_MEMORY = MachineAttributes.USED_MEMORY; + + MethodEffector<String> EXEC_COMMAND = new MethodEffector<String>(MachineEntity.class, "execCommand"); + MethodEffector<String> EXEC_COMMAND_TIMEOUT = new MethodEffector<String>(MachineEntity.class, "execCommandTimeout"); + + /** + * Execute a command and return the output. + */ + @Effector(description = "Execute a command and return the output") + String execCommand( + @EffectorParam(name = "command", description = "Command") String command); + + /** + * Execute a command and return the output, or throw an exception after a timeout. + */ + @Effector(description = "Execute a command and return the output") + String execCommandTimeout( + @EffectorParam(name = "command", description = "Command") String command, + @EffectorParam(name = "timeout", description = "Timeout") Duration timeout); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntityImpl.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntityImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntityImpl.java new file mode 100644 index 0000000..d656102 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntityImpl.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.machine; + +import java.util.List; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.brooklyn.entity.software.base.AbstractSoftwareProcessSshDriver; +import org.apache.brooklyn.entity.software.base.EmptySoftwareProcessDriver; +import org.apache.brooklyn.entity.software.base.EmptySoftwareProcessImpl; +import org.apache.brooklyn.location.basic.Machines; +import org.apache.brooklyn.location.basic.SshMachineLocation; +import org.apache.brooklyn.sensor.feed.ssh.SshFeed; +import org.apache.brooklyn.sensor.feed.ssh.SshPollConfig; +import org.apache.brooklyn.sensor.feed.ssh.SshPollValue; +import org.apache.brooklyn.sensor.ssh.SshEffectorTasks; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.text.Strings; +import org.apache.brooklyn.util.time.Duration; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Splitter; + +public class MachineEntityImpl extends EmptySoftwareProcessImpl implements MachineEntity { + + private static final Logger LOG = LoggerFactory.getLogger(MachineEntityImpl.class); + + static { + MachineAttributes.init(); + } + + private transient SshFeed sensorFeed; + + @Override + public void init() { + LOG.info("Starting server pool machine with id {}", getId()); + super.init(); + } + + @Override + protected void connectSensors() { + super.connectSensors(); + + // Sensors linux-specific + if (!getMachine().getMachineDetails().getOsDetails().isLinux()) return; + + sensorFeed = SshFeed.builder() + .entity(this) + .period(Duration.THIRTY_SECONDS) + .poll(new SshPollConfig<Duration>(UPTIME) + .command("cat /proc/uptime") + .onFailureOrException(Functions.<Duration>constant(null)) + .onSuccess(new Function<SshPollValue, Duration>() { + @Override + public Duration apply(SshPollValue input) { + return Duration.seconds( Double.valueOf( Strings.getFirstWord(input.getStdout()) ) ); + } + })) + .poll(new SshPollConfig<Double>(LOAD_AVERAGE) + .command("uptime") + .onFailureOrException(Functions.constant(-1d)) + .onSuccess(new Function<SshPollValue, Double>() { + @Override + public Double apply(SshPollValue input) { + String loadAverage = Strings.getFirstWordAfter(input.getStdout(), "load average:").replace(",", ""); + return Double.valueOf(loadAverage); + } + })) + .poll(new SshPollConfig<Double>(CPU_USAGE) + .command("cat /proc/stat") + .onFailureOrException(Functions.constant(-1d)) + .onSuccess(new Function<SshPollValue, Double>() { + @Override + public Double apply(SshPollValue input) { + List<String> cpuData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(input.getStdout())); + Integer system = Integer.parseInt(cpuData.get(1)); + Integer user = Integer.parseInt(cpuData.get(3)); + Integer idle = Integer.parseInt(cpuData.get(4)); + return (double) (system + user) / (double) (system + user + idle); + } + })) + .poll(new SshPollConfig<Long>(USED_MEMORY) + .command("free | grep Mem:") + .onFailureOrException(Functions.constant(-1L)) + .onSuccess(new Function<SshPollValue, Long>() { + @Override + public Long apply(SshPollValue input) { + List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(input.getStdout())); + return Long.parseLong(memoryData.get(2)); + } + })) + .poll(new SshPollConfig<Long>(FREE_MEMORY) + .command("free | grep Mem:") + .onFailureOrException(Functions.constant(-1L)) + .onSuccess(new Function<SshPollValue, Long>() { + @Override + public Long apply(SshPollValue input) { + List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(input.getStdout())); + return Long.parseLong(memoryData.get(3)); + } + })) + .poll(new SshPollConfig<Long>(TOTAL_MEMORY) + .command("free | grep Mem:") + .onFailureOrException(Functions.constant(-1L)) + .onSuccess(new Function<SshPollValue, Long>() { + @Override + public Long apply(SshPollValue input) { + List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(input.getStdout())); + return Long.parseLong(memoryData.get(1)); + } + })) + .build(); + + } + + @Override + public void disconnectSensors() { + if (sensorFeed != null) sensorFeed.stop(); + super.disconnectSensors(); + } + + @Override + public Class<?> getDriverInterface() { + return EmptySoftwareProcessDriver.class; + } + + public SshMachineLocation getMachine() { + return Machines.findUniqueSshMachineLocation(getLocations()).get(); + } + + @Override + public String execCommand(String command) { + return execCommandTimeout(command, Duration.ONE_MINUTE); + } + + @Override + public String execCommandTimeout(String command, Duration timeout) { + ProcessTaskWrapper<String> task = SshEffectorTasks.ssh(command) + .environmentVariables(((AbstractSoftwareProcessSshDriver) getDriver()).getShellEnvironment()) + .requiringZeroAndReturningStdout() + .machine(getMachine()) + .summary(command) + .newTask(); + + try { + String result = DynamicTasks.queueIfPossible(task) + .executionContext(this) + .orSubmitAsync() + .asTask() + .get(timeout); + return result; + } catch (TimeoutException te) { + throw new IllegalStateException("Timed out running command: " + command); + } catch (Exception e) { + Integer exitCode = task.getExitCode(); + LOG.warn("Command failed, return code {}: {}", exitCode == null ? -1 : exitCode, task.getStderr()); + throw Exceptions.propagate(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineInitTasks.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineInitTasks.java b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineInitTasks.java new file mode 100644 index 0000000..91b0411 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineInitTasks.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.machine; + +import java.util.List; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; +import org.apache.brooklyn.entity.core.EntityInternal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.Beta; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import org.apache.brooklyn.location.basic.SshMachineLocation; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.core.task.ssh.SshTasks; +import org.apache.brooklyn.util.net.Protocol; +import org.apache.brooklyn.util.ssh.BashCommands; +import org.apache.brooklyn.util.ssh.IptablesCommands; +import org.apache.brooklyn.util.ssh.IptablesCommands.Chain; +import org.apache.brooklyn.util.ssh.IptablesCommands.Policy; +import org.apache.brooklyn.util.text.Strings; + +/** + * + */ +@Beta +public class MachineInitTasks { + + // TODO Move somewhere so code can also be called by JcloudsLocation! + + private static final Logger log = LoggerFactory.getLogger(MachineInitTasks.class); + + protected EntityInternal entity() { + return (EntityInternal) BrooklynTaskTags.getTargetOrContextEntity(Tasks.current()); + } + + /** + * Returns a queued {@link Task} which opens the given ports in iptables on the given machine. + */ + public Task<Void> openIptablesAsync(final Iterable<Integer> inboundPorts, final SshMachineLocation machine) { + return DynamicTasks.queue("open iptables "+toTruncatedString(inboundPorts, 6), new Callable<Void>() { + public Void call() { + openIptablesImpl(inboundPorts, machine); + return null; + } + }); + } + + /** + * Returns a queued {@link Task} which stops iptables on the given machine. + */ + public Task<Void> stopIptablesAsync(final SshMachineLocation machine) { + return DynamicTasks.queue("stop iptables", new Callable<Void>() { + public Void call() { + stopIptablesImpl(machine); + return null; + } + }); + } + + /** + * See docs in {@link BashCommands#dontRequireTtyForSudo()} + */ + public Task<Boolean> dontRequireTtyForSudoAsync(final SshMachineLocation machine) { + return DynamicTasks.queue(SshTasks.dontRequireTtyForSudo(machine, true).newTask().asTask()); + } + + protected void openIptablesImpl(Iterable<Integer> inboundPorts, SshMachineLocation machine) { + if (inboundPorts == null || Iterables.isEmpty(inboundPorts)) { + log.info("No ports to open in iptables (no inbound ports) for {} at {}", machine, this); + } else { + log.info("Opening ports in iptables for {} at {}", entity(), machine); + + List<String> iptablesRules = Lists.newArrayList(); + + if (isLocationFirewalldEnabled(machine)) { + for (Integer port : inboundPorts) { + iptablesRules.add(IptablesCommands.addFirewalldRule(Chain.INPUT, Protocol.TCP, port, Policy.ACCEPT)); + } + } else { + iptablesRules = createIptablesRulesForNetworkInterface(inboundPorts); + iptablesRules.add(IptablesCommands.saveIptablesRules()); + } + List<String> batch = Lists.newArrayList(); + // Some entities, such as Riak (erlang based) have a huge range of ports, which leads to a script that + // is too large to run (fails with a broken pipe). Batch the rules into batches of 50 + for (String rule : iptablesRules) { + batch.add(rule); + if (batch.size() == 50) { + machine.execCommands("Inserting iptables rules, 50 command batch", batch); + batch.clear(); + } + } + if (batch.size() > 0) { + machine.execCommands("Inserting iptables rules", batch); + } + machine.execCommands("List iptables rules", ImmutableList.of(IptablesCommands.listIptablesRule())); + } + } + + protected void stopIptablesImpl(SshMachineLocation machine) { + log.info("Stopping iptables for {} at {}", entity(), machine); + + List<String> cmds = ImmutableList.<String>of(); + if (isLocationFirewalldEnabled(machine)) { + cmds = ImmutableList.of(IptablesCommands.firewalldServiceStop(), IptablesCommands.firewalldServiceStatus()); + } else { + cmds = ImmutableList.of(IptablesCommands.iptablesServiceStop(), IptablesCommands.iptablesServiceStatus()); + } + machine.execCommands("Stopping iptables", cmds); + } + + private List<String> createIptablesRulesForNetworkInterface(Iterable<Integer> ports) { + List<String> iptablesRules = Lists.newArrayList(); + for (Integer port : ports) { + iptablesRules.add(IptablesCommands.insertIptablesRule(Chain.INPUT, Protocol.TCP, port, Policy.ACCEPT)); + } + return iptablesRules; + } + + public boolean isLocationFirewalldEnabled(SshMachineLocation location) { + int result = location.execCommands("checking if firewalld is active", + ImmutableList.of(IptablesCommands.firewalldServiceIsActive())); + if (result == 0) { + return true; + } + + return false; + } + + protected String toTruncatedString(Iterable<?> vals, int maxShown) { + StringBuilder result = new StringBuilder("["); + int shown = 0; + for (Object val : (vals == null ? ImmutableList.of() : vals)) { + if (shown != 0) { + result.append(", "); + } + if (shown < maxShown) { + result.append(Strings.toString(val)); + shown++; + } else { + result.append("..."); + break; + } + } + result.append("]"); + return result.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/machine/ProvidesProvisioningFlags.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/ProvidesProvisioningFlags.java b/software/base/src/main/java/org/apache/brooklyn/entity/machine/ProvidesProvisioningFlags.java new file mode 100644 index 0000000..cad449a --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/ProvidesProvisioningFlags.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.machine; + +import org.apache.brooklyn.api.location.MachineProvisioningLocation; +import org.apache.brooklyn.entity.software.base.lifecycle.MachineLifecycleEffectorTasks; +import org.apache.brooklyn.util.core.config.ConfigBag; + +import com.google.common.annotations.Beta; + +/** Marker interface for an entity which supplies custom machine provisioning flags; + * used e.g. in {@link MachineLifecycleEffectorTasks}. + * @since 0.6.0 */ +@Beta +public interface ProvidesProvisioningFlags { + + public ConfigBag obtainProvisioningFlags(MachineProvisioningLocation<?> location); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPool.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPool.java b/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPool.java new file mode 100644 index 0000000..548242f --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPool.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.machine.pool; + +import java.util.Collection; +import java.util.Map; + +import com.google.common.annotations.Beta; + +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.entity.ImplementedBy; +import org.apache.brooklyn.api.location.MachineLocation; +import org.apache.brooklyn.api.location.NoMachinesAvailableException; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.effector.core.MethodEffector; +import org.apache.brooklyn.entity.annotation.Effector; +import org.apache.brooklyn.entity.annotation.EffectorParam; +import org.apache.brooklyn.entity.group.DynamicCluster; +import org.apache.brooklyn.entity.machine.MachineEntity; +import org.apache.brooklyn.location.cloud.CloudLocationConfig; +import org.apache.brooklyn.location.dynamic.LocationOwner; +import org.apache.brooklyn.sensor.core.Sensors; + +/** + * A preallocated server pool is an entity that other applications can deploy to. + * Behaving as a cluster, the machines it creates for its members are reused. + * <p/> + * Notes: + * <ul> + * <li> + * The pool does not configure ports appropriately for applications subsequently + * deployed. If an entity that is to be run in the pool requires any ports open + * other than port 22 then thoses port should be configured with the + * {@link CloudLocationConfig#INBOUND_PORTS INBOUND_PORTS} + * config key as part of the pool's + * {@link org.apache.brooklyn.entity.software.base.SoftwareProcess#PROVISIONING_PROPERTIES PROVISIONING_PROPERTIES}. + * For example, in YAML: + * <pre> + * - type: brooklyn.entity.pool.ServerPool + * brooklyn.config: + * # Suitable for TomcatServers + * provisioning.properties: + * inboundPorts: [22, 31880, 8443, 8080, 31001, 1099] + * </pre> + * This is a limitation of Brooklyn that will be addressed in a future release. + * </li> + * </ul> + */ +@Catalog(name="Server Pool", description="Creates a pre-allocated server pool, which other applications can deploy to") +@ImplementedBy(ServerPoolImpl.class) +@Beta +public interface ServerPool extends DynamicCluster, LocationOwner<ServerPoolLocation, ServerPool> { + + ConfigKey<Integer> INITIAL_SIZE = ConfigKeys.newConfigKeyWithDefault(DynamicCluster.INITIAL_SIZE, 2); + + AttributeSensor<Integer> AVAILABLE_COUNT = Sensors.newIntegerSensor( + "pool.available", "The number of locations in the pool that are unused"); + + AttributeSensor<Integer> CLAIMED_COUNT = Sensors.newIntegerSensor( + "pool.claimed", "The number of locations in the pool that are in use"); + + ConfigKey<EntitySpec<?>> MEMBER_SPEC = ConfigKeys.newConfigKeyWithDefault(DynamicCluster.MEMBER_SPEC, + EntitySpec.create(MachineEntity.class)); + + MethodEffector<Collection<Entity>> ADD_MACHINES_FROM_SPEC = new MethodEffector<Collection<Entity>>(ServerPool.class, "addExistingMachinesFromSpec"); + + public MachineLocation claimMachine(Map<?, ?> flags) throws NoMachinesAvailableException; + + public void releaseMachine(MachineLocation machine); + + /** + * Sets the pool to use an existing {@link MachineLocation} as a member. Existing locations + * will count towards the capacity of the pool but will not be terminated when the pool is + * stopped. + * @param machine An existing machine. + * @return the new member of the pool, created with the configured {@link #MEMBER_SPEC}. + */ + public Entity addExistingMachine(MachineLocation machine); + + /** + * Adds additional machines to the pool by resolving the given spec. + * @param spec + * A location spec, e.g. <code>byon:(hosts="[email protected],[email protected],[email protected]")</code> + * @return the new members of the pool, created with the configured {@link #MEMBER_SPEC}. + */ + @Effector(description = "Adds additional machines to the pool by resolving the given spec.") + public Collection<Entity> addExistingMachinesFromSpec( + @EffectorParam(name = "spec", description = "Spec") String spec); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPoolImpl.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPoolImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPoolImpl.java new file mode 100644 index 0000000..d1d4bee --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPoolImpl.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.machine.pool; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.location.LocationDefinition; +import org.apache.brooklyn.api.location.MachineLocation; +import org.apache.brooklyn.api.location.NoMachinesAvailableException; +import org.apache.brooklyn.api.mgmt.LocationManager; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.policy.PolicySpec; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.effector.core.Effectors; +import org.apache.brooklyn.entity.core.Attributes; +import org.apache.brooklyn.entity.core.EntityInternal; +import org.apache.brooklyn.entity.group.AbstractMembershipTrackingPolicy; +import org.apache.brooklyn.entity.group.DynamicClusterImpl; +import org.apache.brooklyn.entity.lifecycle.Lifecycle; +import org.apache.brooklyn.entity.trait.Startable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.brooklyn.location.basic.BasicLocationDefinition; +import org.apache.brooklyn.location.basic.Machines; +import org.apache.brooklyn.location.dynamic.DynamicLocation; +import org.apache.brooklyn.sensor.core.Sensors; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.guava.Maybe; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.reflect.TypeToken; + +public class ServerPoolImpl extends DynamicClusterImpl implements ServerPool { + + private static final Logger LOG = LoggerFactory.getLogger(ServerPoolImpl.class); + + private static enum MachinePoolMemberStatus { + /** The server is available for use */ + AVAILABLE, + /** The server has been leased to another application */ + CLAIMED, + /** + * The server will not be leased to other applications. It will be the first + * candidate to release when the pool is shrunk. + */ + UNUSABLE + } + + private static final AttributeSensor<MachinePoolMemberStatus> SERVER_STATUS = Sensors.newSensor(MachinePoolMemberStatus.class, + "pool.serverStatus", "The status of an entity in the pool"); + + // The sensors here would be better as private fields but there's not really a + // good way to manage their state when rebinding. + + /** Accesses must be synchronised by mutex */ + // Would use BiMap but persisting them tends to throw ConcurrentModificationExceptions. + @SuppressWarnings("serial") + public static final AttributeSensor<Map<Entity, MachineLocation>> ENTITY_MACHINE = Sensors.newSensor(new TypeToken<Map<Entity, MachineLocation>>() {}, + "pool.entityMachineMap", "A mapping of entities and their machine locations"); + + @SuppressWarnings("serial") + public static final AttributeSensor<Map<MachineLocation, Entity>> MACHINE_ENTITY = Sensors.newSensor(new TypeToken<Map<MachineLocation, Entity>>() {}, + "pool.machineEntityMap", "A mapping of machine locations and their entities"); + + public static final AttributeSensor<LocationDefinition> DYNAMIC_LOCATION_DEFINITION = Sensors.newSensor(LocationDefinition.class, + "pool.locationDefinition", "The location definition used to create the pool's dynamic location"); + + public static final ConfigKey<Boolean> REMOVABLE = ConfigKeys.newBooleanConfigKey( + "pool.member.removable", "Whether a pool member is removable from the cluster. Used to denote additional " + + "existing machines that were manually added to the pool", true); + + @SuppressWarnings("unused") + private MemberTrackingPolicy membershipTracker; + + @Override + public void init() { + super.init(); + setAttribute(AVAILABLE_COUNT, 0); + setAttribute(CLAIMED_COUNT, 0); + setAttribute(ENTITY_MACHINE, Maps.<Entity, MachineLocation>newHashMap()); + setAttribute(MACHINE_ENTITY, Maps.<MachineLocation, Entity>newHashMap()); + } + + @Override + public void start(Collection<? extends Location> locations) { + // super.start must happen before the policy is added else the initial + // members wont be up (and thus have a MachineLocation) when onEntityAdded + // is called. + super.start(locations); + createLocation(); + addMembershipTrackerPolicy(); + } + + @Override + public void rebind() { + super.rebind(); + addMembershipTrackerPolicy(); + createLocation(); + } + + @Override + public void stop() { + super.stop(); + deleteLocation(); + synchronized (mutex) { + setAttribute(AVAILABLE_COUNT, 0); + setAttribute(CLAIMED_COUNT, 0); + getAttribute(ENTITY_MACHINE).clear(); + getAttribute(MACHINE_ENTITY).clear(); + } + } + + private void addMembershipTrackerPolicy() { + membershipTracker = addPolicy(PolicySpec.create(MemberTrackingPolicy.class) + .displayName(getDisplayName() + " membership tracker") + .configure("group", this)); + } + + @Override + public ServerPoolLocation getDynamicLocation() { + return (ServerPoolLocation) getAttribute(DYNAMIC_LOCATION); + } + + protected ServerPoolLocation createLocation() { + return createLocation(MutableMap.<String, Object>builder() + .putAll(getConfig(LOCATION_FLAGS)) + .put(DynamicLocation.OWNER.getName(), this) + .build()); + } + + @Override + public ServerPoolLocation createLocation(Map<String, ?> flags) { + String locationName = getConfig(LOCATION_NAME); + if (locationName == null) { + String prefix = getConfig(LOCATION_NAME_PREFIX); + String suffix = getConfig(LOCATION_NAME_SUFFIX); + locationName = Joiner.on("-").skipNulls().join(prefix, getId(), suffix); + } + + String locationSpec = String.format(ServerPoolLocationResolver.POOL_SPEC, getId()) + String.format(":(name=\"%s\")", locationName); + LocationDefinition definition = new BasicLocationDefinition(locationName, locationSpec, flags); + getManagementContext().getLocationRegistry().updateDefinedLocation(definition); + Location location = getManagementContext().getLocationRegistry().resolve(definition); + LOG.info("Resolved and registered dynamic location {}: {}", locationName, location); + + setAttribute(LOCATION_SPEC, locationSpec); + setAttribute(DYNAMIC_LOCATION, location); + setAttribute(LOCATION_NAME, location.getId()); + setAttribute(DYNAMIC_LOCATION_DEFINITION, definition); + + return (ServerPoolLocation) location; + } + + @Override + public void deleteLocation() { + LocationManager mgr = getManagementContext().getLocationManager(); + ServerPoolLocation location = getDynamicLocation(); + if (mgr.isManaged(location)) { + LOG.debug("{} deleting and unmanaging location {}", this, location); + mgr.unmanage(location); + } + // definition will only be null if deleteLocation has already been called, e.g. by two calls to stop(). + LocationDefinition definition = getAttribute(DYNAMIC_LOCATION_DEFINITION); + if (definition != null) { + LOG.debug("{} unregistering dynamic location {}", this, definition); + getManagementContext().getLocationRegistry().removeDefinedLocation(definition.getId()); + } + setAttribute(LOCATION_SPEC, null); + setAttribute(DYNAMIC_LOCATION, null); + setAttribute(LOCATION_NAME, null); + setAttribute(DYNAMIC_LOCATION_DEFINITION, null); + } + + @Override + public boolean isLocationAvailable() { + // FIXME: What do true/false mean to callers? + // Is it valid to return false if availableMachines is empty? + return getDynamicLocation() != null; + } + + @Override + public MachineLocation claimMachine(Map<?, ?> flags) throws NoMachinesAvailableException { + LOG.info("Obtaining machine with flags: {}", Joiner.on(", ").withKeyValueSeparator("=").join(flags)); + synchronized (mutex) { + Optional<Entity> claimed = getMemberWithStatus(MachinePoolMemberStatus.AVAILABLE); + if (claimed.isPresent()) { + setEntityStatus(claimed.get(), MachinePoolMemberStatus.CLAIMED); + updateCountSensors(); + LOG.debug("{} has been claimed in {}", claimed, this); + return getEntityMachineMap().get(claimed.get()); + } else { + throw new NoMachinesAvailableException("No machines available in " + this); + } + } + } + + @Override + public void releaseMachine(MachineLocation machine) { + synchronized (mutex) { + Entity entity = getMachineEntityMap().get(machine); + if (entity == null) { + LOG.warn("{} releasing machine {} but its owning entity is not known!", this, machine); + } else { + setEntityStatus(entity, MachinePoolMemberStatus.AVAILABLE); + updateCountSensors(); + LOG.debug("{} has been released in {}", machine, this); + } + } + } + + @Override + public Entity addExistingMachine(MachineLocation machine) { + LOG.info("Adding additional machine to {}: {}", this, machine); + Entity added = addNode(machine, MutableMap.of(REMOVABLE, false)); + Map<String, ?> args = ImmutableMap.of("locations", ImmutableList.of(machine)); + Task<Void> task = Effectors.invocation(added, Startable.START, args).asTask(); + DynamicTasks.queueIfPossible(task).orSubmitAsync(this); + return added; + } + + @Override + public Collection<Entity> addExistingMachinesFromSpec(String spec) { + Location location = getManagementContext().getLocationRegistry().resolve(spec, true, null).orNull(); + List<Entity> additions = Lists.newLinkedList(); + if (location == null) { + LOG.warn("Spec was unresolvable: {}", spec); + } else { + Iterable<MachineLocation> machines = FluentIterable.from(location.getChildren()) + .filter(MachineLocation.class); + LOG.info("{} adding additional machines: {}", this, machines); + // Doesn't need to be synchronised on mutex: it will be claimed per-machine + // as the new members are handled by the membership tracking policy. + for (MachineLocation machine : machines) { + additions.add(addExistingMachine(machine)); + } + LOG.debug("{} added additional machines", this); + } + return additions; + } + + /** + * Overrides to restrict delta to the number of machines that can be <em>safely</em> + * removed (i.e. those that are {@link MachinePoolMemberStatus#UNUSABLE unusable} or + * {@link MachinePoolMemberStatus#AVAILABLE available}). + * <p/> + * Does not modify delta if the pool is stopping. + * @param delta Requested number of members to remove + * @return The entities that were removed + */ + @Override + protected Collection<Entity> shrink(int delta) { + if (Lifecycle.STOPPING.equals(getAttribute(Attributes.SERVICE_STATE_ACTUAL))) { + return super.shrink(delta); + } + + synchronized (mutex) { + int removable = 0; + for (Entity entity : getMembers()) { + // Skip machine marked not for removal and machines that are claimed + if (!Boolean.FALSE.equals(entity.getConfig(REMOVABLE)) && + !MachinePoolMemberStatus.CLAIMED.equals(entity.getAttribute(SERVER_STATUS))) { + removable -= 1; + } + } + + if (delta < removable) { + LOG.warn("Too few removable machines in {} to shrink by delta {}. Altered delta to {}", + new Object[]{this, delta, removable}); + delta = removable; + } + + Collection<Entity> removed = super.shrink(delta); + updateCountSensors(); + return removed; + } + } + + private Map<Entity, MachineLocation> getEntityMachineMap() { + return getAttribute(ENTITY_MACHINE); + } + + private Map<MachineLocation, Entity> getMachineEntityMap() { + return getAttribute(MACHINE_ENTITY); + } + + @Override + public Function<Collection<Entity>, Entity> getRemovalStrategy() { + return UNCLAIMED_REMOVAL_STRATEGY; + } + + private final Function<Collection<Entity>, Entity> UNCLAIMED_REMOVAL_STRATEGY = new Function<Collection<Entity>, Entity>() { + // Semantics of superclass mean that mutex should already be held when apply is called + @Override + public Entity apply(Collection<Entity> members) { + synchronized (mutex) { + Optional<Entity> choice; + if (Lifecycle.STOPPING.equals(getAttribute(Attributes.SERVICE_STATE_ACTUAL))) { + choice = Optional.of(members.iterator().next()); + } else { + // Otherwise should only choose between removable + unusable or available + choice = getMemberWithStatusExcludingUnremovable(members, MachinePoolMemberStatus.UNUSABLE) + .or(getMemberWithStatusExcludingUnremovable(members, MachinePoolMemberStatus.AVAILABLE)); + } + if (!choice.isPresent()) { + LOG.warn("{} has no machines available to remove!", this); + return null; + } else { + LOG.info("{} selected entity to remove from pool: {}", this, choice.get()); + choice.get().getAttribute(SERVER_STATUS); + setEntityStatus(choice.get(), null); + } + MachineLocation entityLocation = getEntityMachineMap().remove(choice.get()); + if (entityLocation != null) { + getMachineEntityMap().remove(entityLocation); + } + return choice.get(); + } + } + }; + + private void serverAdded(Entity member) { + Maybe<MachineLocation> machine = Machines.findUniqueMachineLocation(member.getLocations()); + if (member.getAttribute(SERVER_STATUS) != null) { + LOG.debug("Skipped addition of machine already in the pool: {}", member); + } else if (machine.isPresentAndNonNull()) { + MachineLocation m = machine.get(); + LOG.info("New machine in {}: {}", this, m); + setEntityStatus(member, MachinePoolMemberStatus.AVAILABLE); + synchronized (mutex) { + getEntityMachineMap().put(member, m); + getMachineEntityMap().put(m, member); + updateCountSensors(); + } + } else { + LOG.warn("Member added to {} that does not have a machine location; it will not be used by the pool: {}", + ServerPoolImpl.this, member); + setEntityStatus(member, MachinePoolMemberStatus.UNUSABLE); + } + } + + private void setEntityStatus(Entity entity, MachinePoolMemberStatus status) { + ((EntityInternal) entity).setAttribute(SERVER_STATUS, status); + } + + private Optional<Entity> getMemberWithStatus(MachinePoolMemberStatus status) { + return getMemberWithStatus0(getMembers(), status, true); + } + + private Optional<Entity> getMemberWithStatusExcludingUnremovable(Collection<Entity> entities, MachinePoolMemberStatus status) { + return getMemberWithStatus0(entities, status, false); + } + + private Optional<Entity> getMemberWithStatus0(Collection<Entity> entities, final MachinePoolMemberStatus status, final boolean includeUnremovableMachines) { + return Iterables.tryFind(entities, + new Predicate<Entity>() { + @Override + public boolean apply(Entity input) { + return (includeUnremovableMachines || isRemovable(input)) && + status.equals(input.getAttribute(SERVER_STATUS)); + } + }); + } + + /** @return true if the entity has {@link #REMOVABLE} set to null or true. */ + private boolean isRemovable(Entity entity) { + return !Boolean.FALSE.equals(entity.getConfig(REMOVABLE)); + } + + private void updateCountSensors() { + synchronized (mutex) { + int available = 0, claimed = 0; + for (Entity member : getMembers()) { + MachinePoolMemberStatus status = member.getAttribute(SERVER_STATUS); + if (MachinePoolMemberStatus.AVAILABLE.equals(status)) { + available++; + } else if (MachinePoolMemberStatus.CLAIMED.equals(status)) { + claimed++; + } + } + setAttribute(AVAILABLE_COUNT, available); + setAttribute(CLAIMED_COUNT, claimed); + } + } + + public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { + @Override + protected void onEntityEvent(EventType type, Entity member) { + Boolean isUp = member.getAttribute(Attributes.SERVICE_UP); + LOG.info("{} in {}: {} service up is {}", new Object[]{type.name(), entity, member, isUp}); + if (type.equals(EventType.ENTITY_ADDED) || type.equals(EventType.ENTITY_CHANGE)) { + if (Boolean.TRUE.equals(isUp)) { + ((ServerPoolImpl) entity).serverAdded(member); + } else if (LOG.isDebugEnabled()) { + LOG.debug("{} observed event {} but {} is not up (yet) and will not be used by the pool", + new Object[]{entity, type.name(), member}); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPoolLocation.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPoolLocation.java b/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPoolLocation.java new file mode 100644 index 0000000..6c50f94 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPoolLocation.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.machine.pool; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +import org.apache.brooklyn.api.location.MachineLocation; +import org.apache.brooklyn.api.location.MachineProvisioningLocation; +import org.apache.brooklyn.api.location.NoMachinesAvailableException; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.location.basic.AbstractLocation; +import org.apache.brooklyn.location.dynamic.DynamicLocation; +import org.apache.brooklyn.util.core.flags.SetFromFlag; + +public class ServerPoolLocation extends AbstractLocation implements MachineProvisioningLocation<MachineLocation>, + DynamicLocation<ServerPool, ServerPoolLocation> { + + private static final long serialVersionUID = -6771844611899475409L; + + private static final Logger LOG = LoggerFactory.getLogger(ServerPoolLocation.class); + + @SetFromFlag("owner") + public static final ConfigKey<ServerPool> OWNER = ConfigKeys.newConfigKey( + ServerPool.class, "pool.location.owner"); + + @Override + public void init() { + LOG.debug("Initialising. Owner is: {}", checkNotNull(getConfig(OWNER), OWNER.getName())); + super.init(); + } + + @Override + public ServerPool getOwner() { + return getConfig(OWNER); + } + + @Override + public MachineLocation obtain(Map<?, ?> flags) throws NoMachinesAvailableException { + // Call server pool and try to obtain one of its machines + return getOwner().claimMachine(flags); + } + + @Override + public MachineProvisioningLocation<MachineLocation> newSubLocation(Map<?, ?> newFlags) { + throw new UnsupportedOperationException(); + } + + @Override + public void release(MachineLocation machine) { + getOwner().releaseMachine(machine); + } + + @Override + public Map<String, Object> getProvisioningFlags(Collection<String> tags) { + return Maps.newLinkedHashMap(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPoolLocationResolver.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPoolLocationResolver.java b/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPoolLocationResolver.java new file mode 100644 index 0000000..4bcb1a3 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPoolLocationResolver.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.machine.pool; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.location.LocationRegistry; +import org.apache.brooklyn.api.location.LocationSpec; +import org.apache.brooklyn.api.location.LocationResolver.EnableableLocationResolver; +import org.apache.brooklyn.api.mgmt.ManagementContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +import org.apache.brooklyn.location.basic.BasicLocationRegistry; +import org.apache.brooklyn.location.basic.LocationInternal; +import org.apache.brooklyn.location.basic.LocationPropertiesFromBrooklynProperties; +import org.apache.brooklyn.location.dynamic.DynamicLocation; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.text.KeyValueParser; +import org.apache.brooklyn.util.text.Strings; + +public class ServerPoolLocationResolver implements EnableableLocationResolver { + + private static final Logger LOG = LoggerFactory.getLogger(ServerPoolLocationResolver.class); + private static final String PREFIX = "pool"; + public static final String POOL_SPEC = PREFIX + ":%s"; + private static final Pattern PATTERN = Pattern.compile("("+PREFIX+"|"+PREFIX.toUpperCase()+")" + + ":([a-zA-Z0-9]+)" + // pool Id + "(:\\((.*)\\))?$"); // arguments, e.g. displayName + + private static final Set<String> ACCEPTABLE_ARGS = ImmutableSet.of("name", "displayName"); + + private ManagementContext managementContext; + + @Override + public boolean isEnabled() { + return true; + } + + @Override + public void init(ManagementContext managementContext) { + this.managementContext = checkNotNull(managementContext, "managementContext"); + } + + @Override + public String getPrefix() { + return PREFIX; + } + + @Override + public boolean accepts(String spec, LocationRegistry registry) { + return BasicLocationRegistry.isResolverPrefixForSpec(this, spec, true); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public Location newLocationFromString(Map locationFlags, String spec, LocationRegistry registry) { + if (LOG.isDebugEnabled()) { + LOG.debug("Resolving location '" + spec + "' with flags " + Joiner.on(",").withKeyValueSeparator("=").join(locationFlags)); + } + String namedLocation = (String) locationFlags.get(LocationInternal.NAMED_SPEC_NAME.getName()); + + Matcher matcher = PATTERN.matcher(spec); + if (!matcher.matches()) { + String m = String.format("Invalid location '%s'; must specify either %s:entityId or %s:entityId:(key=argument)", + spec, PREFIX, PREFIX); + throw new IllegalArgumentException(m); + } + + String argsPart = matcher.group(4); + Map<String, String> argsMap = (argsPart != null) ? KeyValueParser.parseMap(argsPart) : Collections.<String,String>emptyMap(); + String displayNamePart = argsMap.get("displayName"); + String namePart = argsMap.get("name"); + + if (!ACCEPTABLE_ARGS.containsAll(argsMap.keySet())) { + Set<String> illegalArgs = Sets.difference(argsMap.keySet(), ACCEPTABLE_ARGS); + throw new IllegalArgumentException("Invalid location '"+spec+"'; illegal args "+illegalArgs+"; acceptable args are "+ACCEPTABLE_ARGS); + } + if (argsMap.containsKey("displayName") && Strings.isEmpty(displayNamePart)) { + throw new IllegalArgumentException("Invalid location '"+spec+"'; if displayName supplied then value must be non-empty"); + } + if (argsMap.containsKey("name") && Strings.isEmpty(namePart)) { + throw new IllegalArgumentException("Invalid location '"+spec+"'; if name supplied then value must be non-empty"); + } + + Map<String, Object> filteredProperties = new LocationPropertiesFromBrooklynProperties() + .getLocationProperties(PREFIX, namedLocation, registry.getProperties()); + MutableMap<String, Object> flags = MutableMap.<String, Object>builder() + .putAll(filteredProperties) + .putAll(locationFlags) + .build(); + + String poolId = matcher.group(2); + if (Strings.isBlank(poolId)) { + throw new IllegalArgumentException("Invalid location '"+spec+"'; pool's entity id must be non-empty"); + } + + final String displayName = displayNamePart != null ? displayNamePart : "Server Pool " + poolId; + final String locationName = namePart != null ? namePart : "serverpool-" + poolId; + + Entity pool = managementContext.getEntityManager().getEntity(poolId); + LocationSpec<ServerPoolLocation> locationSpec = LocationSpec.create(ServerPoolLocation.class) + .configure(flags) + .configure(DynamicLocation.OWNER, pool) + .configure(LocationInternal.NAMED_SPEC_NAME, locationName) + .displayName(displayName); + return managementContext.getLocationManager().createLocation(locationSpec); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java new file mode 100644 index 0000000..5efd7c2 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.software.base; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis; + +import java.io.File; +import java.io.InputStream; +import java.io.Reader; +import java.io.StringReader; +import java.util.Map; + +import org.apache.brooklyn.api.internal.EntityLocal; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.entity.core.BrooklynConfigKeys; +import org.apache.brooklyn.entity.lifecycle.Lifecycle; +import org.apache.brooklyn.entity.lifecycle.ServiceStateLogic; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.ResourceUtils; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.core.text.TemplateProcessor; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.os.Os; +import org.apache.brooklyn.util.stream.ReaderInputStream; +import org.apache.brooklyn.util.text.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.Beta; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; + +/** + * An abstract implementation of the {@link SoftwareProcessDriver}. + */ +public abstract class AbstractSoftwareProcessDriver implements SoftwareProcessDriver { + + private static final Logger log = LoggerFactory.getLogger(AbstractSoftwareProcessDriver.class); + + protected final EntityLocal entity; + protected final ResourceUtils resource; + protected final Location location; + + public AbstractSoftwareProcessDriver(EntityLocal entity, Location location) { + this.entity = checkNotNull(entity, "entity"); + this.location = checkNotNull(location, "location"); + this.resource = ResourceUtils.create(entity); + } + + /* + * (non-Javadoc) + * @see brooklyn.entity.basic.SoftwareProcessDriver#rebind() + */ + @Override + public void rebind() { + // no-op + } + + /** + * Start the entity. + * <p> + * This installs, configures and launches the application process. However, + * users can also call the {@link #install()}, {@link #customize()} and + * {@link #launch()} steps independently. The {@link #postLaunch()} will + * be called after the {@link #launch()} metheod is executed, but the + * process may not be completely initialised at this stage, so care is + * required when implementing these stages. + * <p> + * The {@link BrooklynConfigKeys#ENTITY_RUNNING} key can be set on the location + * or the entity to skip the startup process if the entity is already running, + * according to the {@link #isRunning()} method. To force the startup to be + * skipped, {@link BrooklynConfigKeys#SKIP_ENTITY_START} can be set on the entity. + * The {@link BrooklynConfigKeys#SKIP_ENTITY_INSTALLATION} key can also be used to + * skip the {@link #setup()}, {@link #copyInstallResources()} and + * {@link #install()} methods if set on the entity or location. + * + * @see #stop() + */ + @Override + public void start() { + boolean skipStart = false; + Optional<Boolean> locationRunning = Optional.fromNullable(getLocation().getConfig(BrooklynConfigKeys.SKIP_ENTITY_START_IF_RUNNING)); + Optional<Boolean> entityRunning = Optional.fromNullable(entity.getConfig(BrooklynConfigKeys.SKIP_ENTITY_START_IF_RUNNING)); + Optional<Boolean> entityStarted = Optional.fromNullable(entity.getConfig(BrooklynConfigKeys.SKIP_ENTITY_START)); + if (locationRunning.or(entityRunning).or(false)) { + skipStart = isRunning(); + } else { + skipStart = entityStarted.or(false); + } + if (!skipStart) { + DynamicTasks.queue("copy-pre-install-resources", new Runnable() { public void run() { + waitForConfigKey(BrooklynConfigKeys.PRE_INSTALL_RESOURCES_LATCH); + copyPreInstallResources(); + }}); + + DynamicTasks.queue("pre-install", new Runnable() { public void run() { + preInstall(); + }}); + + if (Strings.isNonBlank(entity.getConfig(BrooklynConfigKeys.PRE_INSTALL_COMMAND))) { + DynamicTasks.queue("pre-install-command", new Runnable() { public void run() { + runPreInstallCommand(entity.getConfig(BrooklynConfigKeys.PRE_INSTALL_COMMAND)); + }}); + }; + + Optional<Boolean> locationInstalled = Optional.fromNullable(getLocation().getConfig(BrooklynConfigKeys.SKIP_ENTITY_INSTALLATION)); + Optional<Boolean> entityInstalled = Optional.fromNullable(entity.getConfig(BrooklynConfigKeys.SKIP_ENTITY_INSTALLATION)); + boolean skipInstall = locationInstalled.or(entityInstalled).or(false); + if (!skipInstall) { + DynamicTasks.queue("setup", new Runnable() { public void run() { + waitForConfigKey(BrooklynConfigKeys.SETUP_LATCH); + setup(); + }}); + + DynamicTasks.queue("copy-install-resources", new Runnable() { public void run() { + waitForConfigKey(BrooklynConfigKeys.INSTALL_RESOURCES_LATCH); + copyInstallResources(); + }}); + + DynamicTasks.queue("install", new Runnable() { public void run() { + waitForConfigKey(BrooklynConfigKeys.INSTALL_LATCH); + install(); + }}); + } + + if (Strings.isNonBlank(entity.getConfig(BrooklynConfigKeys.POST_INSTALL_COMMAND))) { + DynamicTasks.queue("post-install-command", new Runnable() { public void run() { + runPostInstallCommand(entity.getConfig(BrooklynConfigKeys.POST_INSTALL_COMMAND)); + }}); + } + + DynamicTasks.queue("customize", new Runnable() { public void run() { + waitForConfigKey(BrooklynConfigKeys.CUSTOMIZE_LATCH); + customize(); + }}); + + DynamicTasks.queue("copy-runtime-resources", new Runnable() { public void run() { + waitForConfigKey(BrooklynConfigKeys.RUNTIME_RESOURCES_LATCH); + copyRuntimeResources(); + }}); + + if (Strings.isNonBlank(entity.getConfig(BrooklynConfigKeys.PRE_LAUNCH_COMMAND))) { + DynamicTasks.queue("pre-launch-command", new Runnable() { public void run() { + runPreLaunchCommand(entity.getConfig(BrooklynConfigKeys.PRE_LAUNCH_COMMAND)); + }}); + }; + + DynamicTasks.queue("launch", new Runnable() { public void run() { + waitForConfigKey(BrooklynConfigKeys.LAUNCH_LATCH); + launch(); + }}); + + if (Strings.isNonBlank(entity.getConfig(BrooklynConfigKeys.POST_LAUNCH_COMMAND))) { + DynamicTasks.queue("post-launch-command", new Runnable() { public void run() { + runPostLaunchCommand(entity.getConfig(BrooklynConfigKeys.POST_LAUNCH_COMMAND)); + }}); + }; + } + + DynamicTasks.queue("post-launch", new Runnable() { public void run() { + postLaunch(); + }}); + } + + @Override + public abstract void stop(); + + /** + * Implement this method in child classes to add some pre-install behavior + */ + public void preInstall() {} + + public abstract void runPreInstallCommand(String command); + public abstract void setup(); + public abstract void install(); + public abstract void runPostInstallCommand(String command); + public abstract void customize(); + public abstract void runPreLaunchCommand(String command); + public abstract void launch(); + public abstract void runPostLaunchCommand(String command); + + @Override + public void kill() { + stop(); + } + + /** + * Implement this method in child classes to add some post-launch behavior + */ + public void postLaunch() {} + + @Override + public void restart() { + DynamicTasks.queue("stop (best effort)", new Runnable() { + public void run() { + DynamicTasks.markInessential(); + boolean previouslyRunning = isRunning(); + try { + ServiceStateLogic.setExpectedState(getEntity(), Lifecycle.STOPPING); + stop(); + } catch (Exception e) { + // queue a failed task so that there is visual indication that this task had a failure, + // without interrupting the parent + if (previouslyRunning) { + log.warn(getEntity() + " restart: stop failed, when was previously running (ignoring)", e); + DynamicTasks.queue(Tasks.fail("Primary job failure (when previously running)", e)); + } else { + log.debug(getEntity() + " restart: stop failed (but was not previously running, so not a surprise)", e); + DynamicTasks.queue(Tasks.fail("Primary job failure (when not previously running)", e)); + } + // the above queued tasks will cause this task to be indicated as failed, with an indication of severity + } + } + }); + + if (doFullStartOnRestart()) { + DynamicTasks.waitForLast(); + ServiceStateLogic.setExpectedState(getEntity(), Lifecycle.STARTING); + start(); + } else { + DynamicTasks.queue("launch", new Runnable() { public void run() { + ServiceStateLogic.setExpectedState(getEntity(), Lifecycle.STARTING); + launch(); + }}); + DynamicTasks.queue("post-launch", new Runnable() { public void run() { + postLaunch(); + }}); + } + } + + @Beta + /** ideally restart() would take options, e.g. whether to do full start, skip installs, etc; + * however in the absence here is a toggle - not sure how well it works; + * default is false which is similar to previous behaviour (with some seemingly-obvious tidies), + * meaning install and configure will NOT be done on restart. */ + protected boolean doFullStartOnRestart() { + return false; + } + + @Override + public EntityLocal getEntity() { return entity; } + + @Override + public Location getLocation() { return location; } + + public InputStream getResource(String url) { + return resource.getResourceFromUrl(url); + } + + /** + * Files and templates to be copied to the server <em>before</em> pre-install. This allows the {@link #preInstall()} + * process to have access to all required resources. + * <p> + * Will be prefixed with the entity's {@link #getInstallDir() install directory} if relative. + * + * @see SoftwareProcess#PRE_INSTALL_FILES + * @see SoftwareProcess#PRE_INSTALL_TEMPLATES + * @see #copyRuntimeResources() + */ + public void copyPreInstallResources() { + copyResources(entity.getConfig(SoftwareProcess.PRE_INSTALL_FILES), entity.getConfig(SoftwareProcess.PRE_INSTALL_TEMPLATES)); + } + + /** + * Files and templates to be copied to the server <em>before</em> installation. This allows the {@link #install()} + * process to have access to all required resources. + * <p> + * Will be prefixed with the entity's {@link #getInstallDir() install directory} if relative. + * + * @see SoftwareProcess#INSTALL_FILES + * @see SoftwareProcess#INSTALL_TEMPLATES + * @see #copyRuntimeResources() + */ + public void copyInstallResources() { + copyResources(entity.getConfig(SoftwareProcess.INSTALL_FILES), entity.getConfig(SoftwareProcess.INSTALL_TEMPLATES)); + } + + private void copyResources(Map<String, String> files, Map<String, String> templates) { + // Ensure environment variables are not looked up here, otherwise sub-classes might + // lookup port numbers and fail with ugly error if port is not set; better to wait + // until in Entity's code (e.g. customize) where such checks are done explicitly. + + boolean hasAnythingToCopy = ((files != null && files.size() > 0) || (templates != null && templates.size() > 0)); + if (hasAnythingToCopy) { + createDirectory(getInstallDir(), "create install directory"); + + // TODO see comment in copyResource, that should be queued as a task like the above + // (better reporting in activities console) + + if (files != null && files.size() > 0) { + for (String source : files.keySet()) { + String target = files.get(source); + String destination = Os.isAbsolutish(target) ? target : Os.mergePathsUnix(getInstallDir(), target); + copyResource(source, destination, true); + } + } + + if (templates != null && templates.size() > 0) { + for (String source : templates.keySet()) { + String target = templates.get(source); + String destination = Os.isAbsolutish(target) ? target : Os.mergePathsUnix(getInstallDir(), target); + copyTemplate(source, destination, true, MutableMap.<String, Object>of()); + } + } + } + } + + protected abstract void createDirectory(String directoryName, String summaryForLogging); + + /** + * Files and templates to be copied to the server <em>after</em> customisation. This allows overwriting of + * existing files such as entity configuration which may be copied from the installation directory + * during the {@link #customize()} process. + * <p> + * Will be prefixed with the entity's {@link #getRunDir() run directory} if relative. + * + * @see SoftwareProcess#RUNTIME_FILES + * @see SoftwareProcess#RUNTIME_TEMPLATES + * @see #copyInstallResources() + */ + public void copyRuntimeResources() { + try { + createDirectory(getRunDir(), "create run directory"); + + Map<String, String> runtimeFiles = entity.getConfig(SoftwareProcess.RUNTIME_FILES); + if (runtimeFiles != null && runtimeFiles.size() > 0) { + for (String source : runtimeFiles.keySet()) { + String target = runtimeFiles.get(source); + String destination = Os.isAbsolutish(target) ? target : Os.mergePathsUnix(getRunDir(), target); + copyResource(source, destination, true); + } + } + + Map<String, String> runtimeTemplates = entity.getConfig(SoftwareProcess.RUNTIME_TEMPLATES); + if (runtimeTemplates != null && runtimeTemplates.size() > 0) { + for (String source : runtimeTemplates.keySet()) { + String target = runtimeTemplates.get(source); + String destination = Os.isAbsolutish(target) ? target : Os.mergePathsUnix(getRunDir(), target); + copyTemplate(source, destination, true, MutableMap.<String, Object>of()); + } + } + } catch (Exception e) { + log.warn("Error copying runtime resources", e); + throw Exceptions.propagate(e); + } + } + + /** + * @param template File to template and copy. + * @param target Destination on server. + * @return The exit code the SSH command run. + */ + public int copyTemplate(File template, String target) { + return copyTemplate(template.toURI().toASCIIString(), target); + } + + /** + * @param template URI of file to template and copy, e.g. file://.., http://.., classpath://.. + * @param target Destination on server. + * @return The exit code of the SSH command run. + */ + public int copyTemplate(String template, String target) { + return copyTemplate(template, target, false, ImmutableMap.<String, String>of()); + } + + /** + * @param template URI of file to template and copy, e.g. file://.., http://.., classpath://.. + * @param target Destination on server. + * @param extraSubstitutions Extra substitutions for the templater to use, for example + * "foo" -> "bar", and in a template ${foo}. + * @return The exit code of the SSH command run. + */ + public int copyTemplate(String template, String target, boolean createParent, Map<String, ?> extraSubstitutions) { + String data = processTemplate(template, extraSubstitutions); + return copyResource(MutableMap.<Object,Object>of(), new StringReader(data), target, createParent); + } + + public abstract int copyResource(Map<Object,Object> sshFlags, String source, String target, boolean createParentDir); + + public abstract int copyResource(Map<Object,Object> sshFlags, InputStream source, String target, boolean createParentDir); + + /** + * @param file File to copy. + * @param target Destination on server. + * @return The exit code the SSH command run. + */ + public int copyResource(File file, String target) { + return copyResource(file.toURI().toASCIIString(), target); + } + + /** + * @param resource URI of file to copy, e.g. file://.., http://.., classpath://.. + * @param target Destination on server. + * @return The exit code of the SSH command run + */ + public int copyResource(String resource, String target) { + return copyResource(MutableMap.of(), resource, target); + } + + public int copyResource(String resource, String target, boolean createParentDir) { + return copyResource(MutableMap.of(), resource, target, createParentDir); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public int copyResource(Map sshFlags, String source, String target) { + return copyResource(sshFlags, source, target, false); + } + + /** + * @see #copyResource(Map, InputStream, String, boolean) + */ + public int copyResource(Reader source, String target) { + return copyResource(MutableMap.of(), source, target, false); + } + + /** + * @see #copyResource(Map, InputStream, String, boolean) + */ + public int copyResource(Map<Object,Object> sshFlags, Reader source, String target, boolean createParent) { + return copyResource(sshFlags, new ReaderInputStream(source), target, createParent); + } + + /** + * @see #copyResource(Map, InputStream, String, boolean) + */ + public int copyResource(InputStream source, String target) { + return copyResource(MutableMap.of(), source, target, false); + } + + public String getResourceAsString(String url) { + return resource.getResourceAsString(url); + } + + public String processTemplate(File templateConfigFile, Map<String,Object> extraSubstitutions) { + return processTemplate(templateConfigFile.toURI().toASCIIString(), extraSubstitutions); + } + + public String processTemplate(File templateConfigFile) { + return processTemplate(templateConfigFile.toURI().toASCIIString()); + } + + /** Takes the contents of a template file from the given URL (often a classpath://com/myco/myprod/myfile.conf or .sh) + * and replaces "${entity.xxx}" with the result of entity.getXxx() and similar for other driver, location; + * as well as replacing config keys on the management context + * <p> + * uses Freemarker templates under the covers + **/ + public String processTemplate(String templateConfigUrl) { + return processTemplate(templateConfigUrl, ImmutableMap.<String,String>of()); + } + + public String processTemplate(String templateConfigUrl, Map<String,? extends Object> extraSubstitutions) { + return processTemplateContents(getResourceAsString(templateConfigUrl), extraSubstitutions); + } + + public String processTemplateContents(String templateContents) { + return processTemplateContents(templateContents, ImmutableMap.<String,String>of()); + } + + public String processTemplateContents(String templateContents, Map<String,? extends Object> extraSubstitutions) { + return TemplateProcessor.processTemplateContents(templateContents, this, extraSubstitutions); + } + + protected void waitForConfigKey(ConfigKey<?> configKey) { + Object val = entity.getConfig(configKey); + if (val != null) log.debug("{} finished waiting for {} (value {}); continuing...", new Object[] {this, configKey, val}); + } + + /** + * @deprecated since 0.5.0; instead rely on {@link org.apache.brooklyn.api.entity.drivers.downloads.DownloadResolverManager} to include local-repo, such as: + * + * <pre> + * {@code + * DownloadResolver resolver = Entities.newDownloader(this); + * List<String> urls = resolver.getTargets(); + * } + * </pre> + */ + protected String getEntityVersionLabel() { + return getEntityVersionLabel("_"); + } + + /** + * @deprecated since 0.5.0; instead rely on {@link org.apache.brooklyn.api.entity.drivers.downloads.DownloadResolverManager} to include local-repo + */ + protected String getEntityVersionLabel(String separator) { + return elvis(entity.getEntityType().getSimpleName(), + entity.getClass().getName())+(getVersion() != null ? separator+getVersion() : ""); + } + + public String getVersion() { + return getEntity().getConfig(SoftwareProcess.SUGGESTED_VERSION); + } + + public abstract String getRunDir(); + public abstract String getInstallDir(); +}
