http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e2c57058/core/src/main/java/org/apache/brooklyn/location/basic/PortRanges.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/location/basic/PortRanges.java b/core/src/main/java/org/apache/brooklyn/location/basic/PortRanges.java new file mode 100644 index 0000000..6a17c1a --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/location/basic/PortRanges.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.location.basic; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.brooklyn.location.PortRange; +import brooklyn.util.flags.TypeCoercions; + +import com.google.common.base.Function; +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +public class PortRanges { + + public static final int MAX_PORT = 65535; + public static final PortRange ANY_HIGH_PORT = new LinearPortRange(1024, MAX_PORT); + + public static class SinglePort implements PortRange, Serializable { + private static final long serialVersionUID = 7446781416534230401L; + + final int port; + private SinglePort(int port) { this.port = port; } + + @Override + public Iterator<Integer> iterator() { + return Collections.singletonList(port).iterator(); + } + @Override + public boolean isEmpty() { + return false; + } + @Override + public boolean asBoolean() { + return true; + } + @Override + public String toString() { + return //getClass().getName()+"["+ + ""+port; //+"]"; + } + public int hashCode() { + return Objects.hashCode(port); + } + @Override + public boolean equals(Object obj) { + return (obj instanceof SinglePort) && port == ((SinglePort)obj).port; + } + } + + public static class LinearPortRange implements PortRange, Serializable { + private static final long serialVersionUID = -9165280509363743508L; + + final int start, end, delta; + private LinearPortRange(int start, int end, int delta) { + this.start = start; + this.end = end; + this.delta = delta; + checkArgument(start > 0 && start <= MAX_PORT, "start port %s out of range", start); + checkArgument(end > 0 && end <= MAX_PORT, "end port %s out of range", end); + checkArgument(delta > 0 ? start <= end : start >= end, "start and end out of order: %s to %s, delta %s", start, end, delta); + checkArgument(delta != 0, "delta must be non-zero"); + } + public LinearPortRange(int start, int end) { + this(start, end, (start<=end?1:-1)); + } + + @Override + public Iterator<Integer> iterator() { + return new Iterator<Integer>() { + int next = start; + boolean hasNext = true; + + @Override + public boolean hasNext() { + return hasNext; + } + + @Override + public Integer next() { + if (!hasNext) + throw new NoSuchElementException("Exhausted available ports"); + int result = next; + next += delta; + if ((delta>0 && next>end) || (delta<0 && next<end)) hasNext = false; + return result; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public boolean isEmpty() { + return false; + } + @Override + public boolean asBoolean() { + return true; + } + @Override + public String toString() { + return //getClass().getName()+"["+ + start+"-"+end; //+"]"; + } + @Override + public int hashCode() { + return Objects.hashCode(start, end, delta); + } + @Override + public boolean equals(Object obj) { + if (!(obj instanceof LinearPortRange)) return false; + LinearPortRange o = (LinearPortRange) obj; + return start == o.start && end == o.end && delta == o.delta; + } + } + + public static class AggregatePortRange implements PortRange, Serializable { + private static final long serialVersionUID = 7332682500816739660L; + + final List<PortRange> ranges; + private AggregatePortRange(List<PortRange> ranges) { + this.ranges = ImmutableList.copyOf(ranges); + } + @Override + public Iterator<Integer> iterator() { + return Iterables.concat(ranges).iterator(); + } + @Override + public boolean isEmpty() { + for (PortRange r: ranges) + if (!r.isEmpty()) return false; + return true; + } + @Override + public boolean asBoolean() { + return !isEmpty(); + } + @Override + public String toString() { + String s = ""; + for (PortRange r: ranges) { + if (s.length()>0) s+=","; + s += r; + } + return //getClass().getName()+"["+ + s; //+"]"; + } + public int hashCode() { + return Objects.hashCode(ranges); + } + @Override + public boolean equals(Object obj) { + return (obj instanceof AggregatePortRange) && ranges.equals(((AggregatePortRange)obj).ranges); + } + } + + public static PortRange fromInteger(int x) { + return new SinglePort(x); + } + + public static PortRange fromCollection(Collection<?> c) { + List<PortRange> l = new ArrayList<PortRange>(); + for (Object o: c) { + if (o instanceof Integer) l.add(fromInteger((Integer)o)); + else if (o instanceof String) l.add(fromString((String)o)); + else if (o instanceof Collection) l.add(fromCollection((Collection<?>)o)); + else l.add(TypeCoercions.coerce(o, PortRange.class)); + } + return new AggregatePortRange(l); + } + + /** parses a string representation of ports, as "80,8080,8000,8080-8099" */ + public static PortRange fromString(String s) { + List<PortRange> l = new ArrayList<PortRange>(); + for (String si: s.split(",")) { + si = si.trim(); + int start, end; + if (si.endsWith("+")) { + String si2 = si.substring(0, si.length()-1).trim(); + start = Integer.parseInt(si2); + end = MAX_PORT; + } else if (si.indexOf('-')>0) { + int v = si.indexOf('-'); + start = Integer.parseInt(si.substring(0, v).trim()); + end = Integer.parseInt(si.substring(v+1).trim()); + } else if (si.length()==0) { + //nothing, ie empty range, just continue + continue; + } else { + //should be number on its own + l.add(new SinglePort(Integer.parseInt(si))); + continue; + } + l.add(new LinearPortRange(start, end)); + } + if (l.size() == 1) { + return l.get(0); + } else { + return new AggregatePortRange(l); + } + } + + private static AtomicBoolean initialized = new AtomicBoolean(false); + /** performs the language extensions required for this project */ + @SuppressWarnings("rawtypes") + public static void init() { + if (initialized.get()) return; + synchronized (initialized) { + if (initialized.get()) return; + TypeCoercions.registerAdapter(Integer.class, PortRange.class, new Function<Integer,PortRange>() { + public PortRange apply(Integer x) { return fromInteger(x); } + }); + TypeCoercions.registerAdapter(String.class, PortRange.class, new Function<String,PortRange>() { + public PortRange apply(String x) { return fromString(x); } + }); + TypeCoercions.registerAdapter(Collection.class, PortRange.class, new Function<Collection,PortRange>() { + public PortRange apply(Collection x) { return fromCollection(x); } + }); + initialized.set(true); + } + } + + static { + init(); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e2c57058/core/src/main/java/org/apache/brooklyn/location/basic/RegistryLocationResolver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/location/basic/RegistryLocationResolver.java b/core/src/main/java/org/apache/brooklyn/location/basic/RegistryLocationResolver.java new file mode 100644 index 0000000..06558cb --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/location/basic/RegistryLocationResolver.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.location.basic; + +import java.util.Map; + +import org.apache.brooklyn.location.Location; +import org.apache.brooklyn.location.LocationResolver; +import org.apache.brooklyn.location.LocationRegistry; + +/** + * Extension to LocationResolver which can take a registry. + * + * @deprecated since 0.6; the LocationResolver always takes the LocationRegistry now + */ +@Deprecated +public interface RegistryLocationResolver extends LocationResolver { + + @Override + @SuppressWarnings("rawtypes") + Location newLocationFromString(Map locationFlags, String spec, LocationRegistry registry); + + @Override + boolean accepts(String spec, LocationRegistry registry); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e2c57058/core/src/main/java/org/apache/brooklyn/location/basic/SingleMachineLocationResolver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/location/basic/SingleMachineLocationResolver.java b/core/src/main/java/org/apache/brooklyn/location/basic/SingleMachineLocationResolver.java new file mode 100644 index 0000000..4be135f --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/location/basic/SingleMachineLocationResolver.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.location.basic; + +import java.util.Map; + +import org.apache.brooklyn.location.Location; +import org.apache.brooklyn.location.LocationRegistry; +import org.apache.brooklyn.location.LocationSpec; +import brooklyn.util.config.ConfigBag; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.guava.Maybe; +import brooklyn.util.guava.Maybe.Absent; + +public class SingleMachineLocationResolver extends AbstractLocationResolver { + + private static final String SINGLE = "single"; + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public Location newLocationFromString(Map locationFlags, String spec, LocationRegistry registry) { + ConfigBag config = extractConfig(locationFlags, spec, registry); + Map globalProperties = registry.getProperties(); + String namedLocation = (String) locationFlags.get(LocationInternal.NAMED_SPEC_NAME.getName()); + + if (registry != null) { + LocationPropertiesFromBrooklynProperties.setLocalTempDir(globalProperties, config); + } + + if (config.getStringKey("target") == null) { + throw new IllegalArgumentException("target must be specified in single-machine spec"); + } + String target = config.getStringKey("target").toString(); + config.remove("target"); + Maybe<Location> testResolve = managementContext.getLocationRegistry().resolve(target, false, null); + if (!testResolve.isPresent()) { + throw new IllegalArgumentException("Invalid target location '" + target + "' for location '"+SINGLE+"': "+ + Exceptions.collapseText( ((Absent<?>)testResolve).getException() )); + } + + return managementContext.getLocationManager().createLocation(LocationSpec.create(SingleMachineProvisioningLocation.class) + .configure("location", target) + .configure("locationFlags", config.getAllConfig()) + .configure(LocationConfigUtils.finalAndOriginalSpecs(spec, locationFlags, globalProperties, namedLocation))); + } + + @Override + public String getPrefix() { + return SINGLE; + } + + @Override + protected Class<? extends Location> getLocationType() { + return SingleMachineProvisioningLocation.class; + } + + @Override + protected SpecParser getSpecParser() { + return new SpecParser(getPrefix()).setExampleUsage("\"single(target=jclouds:aws-ec2:us-east-1)\""); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e2c57058/core/src/main/java/org/apache/brooklyn/location/basic/SingleMachineProvisioningLocation.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/location/basic/SingleMachineProvisioningLocation.java b/core/src/main/java/org/apache/brooklyn/location/basic/SingleMachineProvisioningLocation.java new file mode 100644 index 0000000..6c40ba3 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/location/basic/SingleMachineProvisioningLocation.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.location.basic; + +import java.util.Map; + +import org.apache.brooklyn.location.NoMachinesAvailableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.brooklyn.location.MachineLocation; +import org.apache.brooklyn.location.MachineProvisioningLocation; +import brooklyn.util.flags.SetFromFlag; + +import com.google.common.collect.ImmutableMap; + +public class SingleMachineProvisioningLocation<T extends MachineLocation> extends FixedListMachineProvisioningLocation<T> { + private static final long serialVersionUID = -4216528515792151062L; + + private static final Logger log = LoggerFactory.getLogger(SingleMachineProvisioningLocation.class); + + @SetFromFlag(nullable=false) + private String location; + + @SetFromFlag(nullable=false) + private Map<?,?> locationFlags; + + private T singleLocation; + private int referenceCount; + private MachineProvisioningLocation<T> provisioningLocation; + + + public SingleMachineProvisioningLocation() { + } + + @SuppressWarnings("rawtypes") + public SingleMachineProvisioningLocation(String location, Map locationFlags) { + this.locationFlags = locationFlags; + this.location = location; + } + + @SuppressWarnings("rawtypes") + @Override + public synchronized T obtain(Map flags) throws NoMachinesAvailableException { + log.info("Flags {} passed to newLocationFromString will be ignored, using {}", flags, locationFlags); + return obtain(); + } + + @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) + public synchronized T obtain() throws NoMachinesAvailableException { + if (singleLocation == null) { + if (provisioningLocation == null) { + provisioningLocation = (MachineProvisioningLocation) getManagementContext().getLocationRegistry().resolve( + location, locationFlags); + } + singleLocation = provisioningLocation.obtain(ImmutableMap.of()); + inUse.add(singleLocation); + } + referenceCount++; + return singleLocation; + } + + @Override + public synchronized void release(T machine) { + if (!machine.equals(singleLocation)) { + throw new IllegalArgumentException("Invalid machine " + machine + " passed to release, expecting: " + singleLocation); + } + if (--referenceCount == 0) { + provisioningLocation.release(machine); + singleLocation = null; + } + inUse.remove(machine); + }; + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e2c57058/core/src/main/java/org/apache/brooklyn/location/basic/SshMachineLocation.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/location/basic/SshMachineLocation.java b/core/src/main/java/org/apache/brooklyn/location/basic/SshMachineLocation.java new file mode 100644 index 0000000..e83712e --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/location/basic/SshMachineLocation.java @@ -0,0 +1,1031 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.location.basic; + +import static brooklyn.util.GroovyJavaMethods.truth; + +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.io.Reader; +import java.io.StringReader; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.security.KeyPair; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.location.MachineDetails; +import org.apache.brooklyn.location.OsDetails; +import org.apache.brooklyn.location.PortRange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.Beta; +import com.google.common.base.Function; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.net.HostAndPort; +import com.google.common.reflect.TypeToken; + +import brooklyn.config.BrooklynLogging; +import brooklyn.config.ConfigKey; +import brooklyn.config.ConfigKey.HasConfigKey; +import brooklyn.config.ConfigUtils; +import brooklyn.entity.basic.BrooklynConfigKeys; +import brooklyn.entity.basic.BrooklynTaskTags; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.event.basic.MapConfigKey; +import org.apache.brooklyn.location.MachineLocation; +import org.apache.brooklyn.location.PortSupplier; +import org.apache.brooklyn.location.access.PortForwardManager; +import brooklyn.util.ResourceUtils; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.config.ConfigBag; +import brooklyn.util.crypto.SecureKeys; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.exceptions.RuntimeInterruptedException; +import brooklyn.util.file.ArchiveUtils; +import brooklyn.util.flags.SetFromFlag; +import brooklyn.util.flags.TypeCoercions; +import brooklyn.util.guava.KeyTransformingLoadingCache.KeyTransformingSameTypeLoadingCache; +import brooklyn.util.internal.ssh.ShellTool; +import brooklyn.util.internal.ssh.SshException; +import brooklyn.util.internal.ssh.SshTool; +import brooklyn.util.internal.ssh.sshj.SshjTool; +import brooklyn.util.mutex.MutexSupport; +import brooklyn.util.mutex.WithMutexes; +import brooklyn.util.net.Urls; +import brooklyn.util.pool.BasicPool; +import brooklyn.util.pool.Pool; +import brooklyn.util.ssh.BashCommands; +import brooklyn.util.stream.KnownSizeInputStream; +import brooklyn.util.stream.ReaderInputStream; +import brooklyn.util.stream.StreamGobbler; +import brooklyn.util.task.ScheduledTask; +import brooklyn.util.task.Tasks; +import brooklyn.util.task.system.internal.ExecWithLoggingHelpers; +import brooklyn.util.task.system.internal.ExecWithLoggingHelpers.ExecRunner; +import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; +import groovy.lang.Closure; + +/** + * Operations on a machine that is accessible via ssh. + * <p> + * We expose two ways of running scripts. + * The execCommands method passes lines to bash and is lightweight but fragile. + * The execScript method creates a script on the remote machine. It is portable but heavier. + * <p> + * Additionally there are routines to copyTo, copyFrom; and installTo (which tries a curl, and falls back to copyTo + * in event the source is accessible by the caller only). + */ +public class SshMachineLocation extends AbstractLocation implements MachineLocation, PortSupplier, WithMutexes, Closeable { + + /** @deprecated since 0.7.0 shouldn't be public */ + public static final Logger LOG = LoggerFactory.getLogger(SshMachineLocation.class); + /** @deprecated since 0.7.0 shouldn't be public */ + public static final Logger logSsh = LoggerFactory.getLogger(BrooklynLogging.SSH_IO); + + // Use a sane timeout when doing a connectivity test + private static final int SSHABLE_CONNECT_TIMEOUT = (int)Duration.minutes(2).toMilliseconds(); + + public static final ConfigKey<Duration> SSH_CACHE_EXPIRY_DURATION = ConfigKeys.newConfigKey(Duration.class, + "sshCacheExpiryDuration", "Expiry time for unused cached ssh connections", Duration.FIVE_MINUTES); + + public static final ConfigKey<MachineDetails> MACHINE_DETAILS = ConfigKeys.newConfigKey( + MachineDetails.class, + "machineDetails"); + + public static final ConfigKey<Boolean> DETECT_MACHINE_DETAILS = ConfigKeys.newBooleanConfigKey("detectMachineDetails", + "Attempt to detect machine details automatically. Works with SSH-accessible Linux instances.", true); + + public static final ConfigKey<Iterable<String>> PRIVATE_ADDRESSES = ConfigKeys.newConfigKey( + new TypeToken<Iterable<String>>() {}, + "privateAddresses", + "Private addresses of this machine, e.g. those within the private network", + null); + + public static final ConfigKey<Map<Integer, String>> TCP_PORT_MAPPINGS = ConfigKeys.newConfigKey( + new TypeToken<Map<Integer, String>>() {}, + "tcpPortMappings", + "NAT'ed ports, giving the mapping from private TCP port to a public host:port", + null); + + @SetFromFlag + protected String user; + + @SetFromFlag(nullable = false) + protected InetAddress address; + + // TODO should not allow this to be set from flag; it is not persisted so that will be lost + // (mainly used for localhost currently so not a big problem) + @Nullable // lazily initialized; use getMutexSupport() + @SetFromFlag + private transient WithMutexes mutexSupport; + + @SetFromFlag + private Set<Integer> usedPorts; + + private volatile MachineDetails machineDetails; + private final Object machineDetailsLock = new Object(); + + public static final ConfigKey<String> SSH_HOST = BrooklynConfigKeys.SSH_CONFIG_HOST; + public static final ConfigKey<Integer> SSH_PORT = BrooklynConfigKeys.SSH_CONFIG_PORT; + + public static final ConfigKey<String> SSH_EXECUTABLE = ConfigKeys.newStringConfigKey("sshExecutable", + "Allows an `ssh` executable file to be specified, to be used in place of the default (programmatic) java ssh client"); + public static final ConfigKey<String> SCP_EXECUTABLE = ConfigKeys.newStringConfigKey("scpExecutable", + "Allows an `scp` executable file to be specified, to be used in place of the default (programmatic) java ssh client"); + + // TODO remove + public static final ConfigKey<String> PASSWORD = SshTool.PROP_PASSWORD; + public static final ConfigKey<String> PRIVATE_KEY_FILE = SshTool.PROP_PRIVATE_KEY_FILE; + public static final ConfigKey<String> PRIVATE_KEY_DATA = SshTool.PROP_PRIVATE_KEY_DATA; + public static final ConfigKey<String> PRIVATE_KEY_PASSPHRASE = SshTool.PROP_PRIVATE_KEY_PASSPHRASE; + + public static final ConfigKey<String> SCRIPT_DIR = ConfigKeys.newStringConfigKey( + "scriptDir", "directory where scripts should be placed and executed on the SSH target machine"); + public static final ConfigKey<Map<String,Object>> SSH_ENV_MAP = new MapConfigKey<Object>( + Object.class, "env", "environment variables to pass to the remote SSH shell session"); + + public static final ConfigKey<Boolean> ALLOCATE_PTY = SshTool.PROP_ALLOCATE_PTY; + + public static final ConfigKey<OutputStream> STDOUT = new BasicConfigKey<OutputStream>(OutputStream.class, "out"); + public static final ConfigKey<OutputStream> STDERR = new BasicConfigKey<OutputStream>(OutputStream.class, "err"); + public static final ConfigKey<Boolean> NO_STDOUT_LOGGING = ConfigKeys.newBooleanConfigKey( + "noStdoutLogging", "whether to disable logging of stdout from SSH commands (e.g. for verbose commands)", false); + public static final ConfigKey<Boolean> NO_STDERR_LOGGING = ConfigKeys.newBooleanConfigKey( + "noStderrLogging", "whether to disable logging of stderr from SSH commands (e.g. for verbose commands)", false); + public static final ConfigKey<String> LOG_PREFIX = ConfigKeys.newStringConfigKey("logPrefix"); + + public static final ConfigKey<String> LOCAL_TEMP_DIR = SshTool.PROP_LOCAL_TEMP_DIR; + + public static final ConfigKey<Boolean> CLOSE_CONNECTION = ConfigKeys.newBooleanConfigKey("close", "Close the SSH connection after use", false); + public static final ConfigKey<String> UNIQUE_ID = ConfigKeys.newStringConfigKey("unique", "Unique ID for the SSH connection"); + + /** + * Specifies config keys where a change in the value does not require a new SshTool instance, + * i.e. they can be specified per command on the tool + */ + // TODO: Fully specify. + public static final Set<ConfigKey<?>> REUSABLE_SSH_PROPS = ImmutableSet.of( + STDOUT, STDERR, SCRIPT_DIR, CLOSE_CONNECTION, + SshTool.PROP_SCRIPT_HEADER, SshTool.PROP_PERMISSIONS, SshTool.PROP_LAST_MODIFICATION_DATE, + SshTool.PROP_LAST_ACCESS_DATE, SshTool.PROP_OWNER_UID, SshTool.PROP_SSH_RETRY_DELAY); + + public static final Set<HasConfigKey<?>> ALL_SSH_CONFIG_KEYS = + ImmutableSet.<HasConfigKey<?>>builder() + .addAll(ConfigUtils.getStaticKeysOnClass(SshMachineLocation.class)) + .addAll(ConfigUtils.getStaticKeysOnClass(SshTool.class)) + .build(); + + public static final Set<String> ALL_SSH_CONFIG_KEY_NAMES = + ImmutableSet.copyOf(Iterables.transform(ALL_SSH_CONFIG_KEYS, new Function<HasConfigKey<?>,String>() { + @Override + public String apply(HasConfigKey<?> input) { + return input.getConfigKey().getName(); + } + })); + + /** + * The set of config keys on this location which become default values for properties when invoking an SSH + * operation. + */ + @Beta + public static final Set<ConfigKey<?>> SSH_CONFIG_GIVEN_TO_PROPS = ImmutableSet.<ConfigKey<?>>of( + SCRIPT_DIR); + + private Task<?> cleanupTask; + /** callers should use {@link #getSshPoolCache()} */ + @Nullable + private transient LoadingCache<Map<String, ?>, Pool<SshTool>> sshPoolCacheOrNull; + + public SshMachineLocation() { + this(MutableMap.of()); + } + + public SshMachineLocation(Map properties) { + super(properties); + usedPorts = (usedPorts != null) ? Sets.newLinkedHashSet(usedPorts) : Sets.<Integer>newLinkedHashSet(); + } + + @Override + public void init() { + super.init(); + + // Register any pre-existing port-mappings with the PortForwardManager + Map<Integer, String> tcpPortMappings = getConfig(TCP_PORT_MAPPINGS); + if (tcpPortMappings != null) { + PortForwardManager pfm = (PortForwardManager) getManagementContext().getLocationRegistry().resolve("portForwardManager(scope=global)"); + for (Map.Entry<Integer, String> entry : tcpPortMappings.entrySet()) { + int targetPort = entry.getKey(); + HostAndPort publicEndpoint = HostAndPort.fromString(entry.getValue()); + if (!publicEndpoint.hasPort()) { + throw new IllegalArgumentException("Invalid portMapping ('"+entry.getValue()+"') for port "+targetPort+" in machine "+this); + } + pfm.associate(publicEndpoint.getHostText(), publicEndpoint, this, targetPort); + } + } + } + + private final transient Object poolCacheMutex = new Object(); + @Nonnull + private LoadingCache<Map<String, ?>, Pool<SshTool>> getSshPoolCache() { + synchronized (poolCacheMutex) { + if (sshPoolCacheOrNull==null) { + sshPoolCacheOrNull = buildSshToolPoolCacheLoader(); + addSshPoolCacheCleanupTask(); + } + } + return sshPoolCacheOrNull; + } + + private LoadingCache<Map<String, ?>, Pool<SshTool>> buildSshToolPoolCacheLoader() { + // TODO: Appropriate numbers for maximum size and expire after access + // At the moment every SshMachineLocation instance creates its own pool. + // It might make more sense to create one pool and inject it into all SshMachineLocations. + Duration expiryDuration = getConfig(SSH_CACHE_EXPIRY_DURATION); + + LoadingCache<Map<String, ?>, Pool<SshTool>> delegate = CacheBuilder.newBuilder() + .maximumSize(10) + .expireAfterAccess(expiryDuration.toMilliseconds(), TimeUnit.MILLISECONDS) + .recordStats() + .removalListener(new RemovalListener<Map<String, ?>, Pool<SshTool>>() { + // TODO: Does it matter that this is synchronous? - Can closing pools cause long delays? + @Override + public void onRemoval(RemovalNotification<Map<String, ?>, Pool<SshTool>> notification) { + Pool<SshTool> removed = notification.getValue(); + if (removed == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Pool evicted from SshTool cache is null so we can't call pool.close(). " + + "It's probably already been garbage collected. Eviction cause: {} ", + notification.getCause().name()); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("{} evicted from SshTool cache. Eviction cause: {}", + removed, notification.getCause().name()); + } + try { + removed.close(); + } catch (IOException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Exception closing "+removed, e); + } + } + } + } + }) + .build(new CacheLoader<Map<String, ?>, Pool<SshTool>>() { + public Pool<SshTool> load(Map<String, ?> properties) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} building ssh pool for {} with properties: {}", + new Object[] {this, getSshHostAndPort(), properties}); + } + return buildPool(properties); + } + }); + + final Set<String> reusableSshProperties = ImmutableSet.copyOf( + Iterables.transform(REUSABLE_SSH_PROPS, new Function<ConfigKey<?>, String>() { + @Override public String apply(ConfigKey<?> input) { + return input.getName(); + } + })); + // Groovy-eclipse compiler refused to compile `KeyTransformingSameTypeLoadingCache.from(...)` + return new KeyTransformingSameTypeLoadingCache<Map<String, ?>, Pool<SshTool>>( + delegate, + new Function<Map<String, ?>, Map<String, ?>>() { + @Override + public Map<String, ?> apply(@Nullable Map<String, ?> input) { + Map<String, Object> copy = new HashMap<String, Object>(input); + copy.keySet().removeAll(reusableSshProperties); + return copy; + } + }); + } + + private BasicPool<SshTool> buildPool(final Map<String, ?> properties) { + return BasicPool.<SshTool>builder() + .name(getDisplayName()+"@"+address+":"+getPort()+ + (config().getRaw(SSH_HOST).isPresent() ? "("+getConfig(SSH_HOST)+":"+getConfig(SSH_PORT)+")" : "")+ + ":hash"+System.identityHashCode(this)) + .supplier(new Supplier<SshTool>() { + @Override public SshTool get() { + return connectSsh(properties); + }}) + .viabilityChecker(new Predicate<SshTool>() { + @Override public boolean apply(SshTool input) { + return input != null && input.isConnected(); + }}) + .closer(new Function<SshTool,Void>() { + @Override public Void apply(SshTool input) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} closing pool for {}", this, input); + } + try { + input.disconnect(); + } catch (Exception e) { + if (logSsh.isDebugEnabled()) logSsh.debug("On machine "+SshMachineLocation.this+", ssh-disconnect failed", e); + } + return null; + }}) + .build(); + } + + @Override + public SshMachineLocation configure(Map<?,?> properties) { + super.configure(properties); + + // TODO Note that check for addresss!=null is done automatically in super-constructor, in FlagUtils.checkRequiredFields + // Yikes, dangerous code for accessing fields of sub-class in super-class' constructor! But getting away with it so far! + + boolean deferConstructionChecks = (properties.containsKey("deferConstructionChecks") && TypeCoercions.coerce(properties.get("deferConstructionChecks"), Boolean.class)); + if (!deferConstructionChecks) { + if (getDisplayName() == null) { + setDisplayName((truth(user) ? user+"@" : "") + address.getHostName()); + } + } + return this; + } + + private transient final Object mutexSupportCreationLock = new Object(); + protected WithMutexes getMutexSupport() { + synchronized (mutexSupportCreationLock) { + // create on demand so that it is not null after serialization + if (mutexSupport == null) { + mutexSupport = new MutexSupport(); + } + return mutexSupport; + } + } + + protected void addSshPoolCacheCleanupTask() { + if (cleanupTask!=null && !cleanupTask.isDone()) { + return; + } + if (getManagementContext()==null || getManagementContext().getExecutionManager()==null) { + LOG.debug("No management context for "+this+"; ssh-pool cache will only be closed when machine is closed"); + return; + } + + Callable<Task<?>> cleanupTaskFactory = new Callable<Task<?>>() { + @Override public Task<Void> call() { + return Tasks.<Void>builder().dynamic(false).tag(BrooklynTaskTags.TRANSIENT_TASK_TAG) + .name("ssh-location cache cleaner").body(new Callable<Void>() { + @Override public Void call() { + try { + if (sshPoolCacheOrNull != null) sshPoolCacheOrNull.cleanUp(); + if (!SshMachineLocation.this.isManaged()) { + if (sshPoolCacheOrNull != null) sshPoolCacheOrNull.invalidateAll(); + cleanupTask.cancel(false); + sshPoolCacheOrNull = null; + } + return null; + } catch (Exception e) { + // Don't rethrow: the behaviour of executionManager is different from a scheduledExecutorService, + // if we throw an exception, then our task will never get executed again + LOG.warn("Problem cleaning up ssh-pool-cache", e); + return null; + } catch (Throwable t) { + LOG.warn("Problem cleaning up ssh-pool-cache (rethrowing)", t); + throw Exceptions.propagate(t); + } + }}).build(); + } + }; + + Duration expiryDuration = getConfig(SSH_CACHE_EXPIRY_DURATION); + cleanupTask = getManagementContext().getExecutionManager().submit(new ScheduledTask( + MutableMap.of("displayName", "scheduled[ssh-location cache cleaner]"), cleanupTaskFactory).period(expiryDuration)); + } + + // TODO close has been used for a long time to perform clean-up wanted on unmanagement, but that's not clear; + // we should probably expose a mechanism such as that in Entity (or re-use Entity for locations!) + @Override + public void close() throws IOException { + if (sshPoolCacheOrNull != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} invalidating all entries in ssh pool cache. Final stats: {}", this, sshPoolCacheOrNull.stats()); + } + sshPoolCacheOrNull.invalidateAll(); + } + if (cleanupTask != null) { + cleanupTask.cancel(false); + cleanupTask = null; + sshPoolCacheOrNull = null; + } + } + + // should not be necessary, and causes objects to be kept around a lot longer than desired +// @Override +// protected void finalize() throws Throwable { +// try { +// close(); +// } finally { +// super.finalize(); +// } +// } + + @Override + public InetAddress getAddress() { + return address; + } + + @Override + public String getHostname() { + String hostname = address.getHostName(); + return (hostname == null || hostname.equals(address.getHostAddress())) ? null : hostname; + } + + @Override + public Set<String> getPublicAddresses() { + return ImmutableSet.of(address.getHostAddress()); + } + + @Override + public Set<String> getPrivateAddresses() { + Iterable<String> result = getConfig(PRIVATE_ADDRESSES); + return (result == null) ? ImmutableSet.<String>of() : ImmutableSet.copyOf(result); + } + + public HostAndPort getSshHostAndPort() { + String host = getConfig(SSH_HOST); + if (host == null || Strings.isEmpty(host)) + host = address.getHostName(); + Integer port = getConfig(SSH_PORT); + if (port == null || port == 0) + port = 22; + return HostAndPort.fromParts(host, port); + } + + public String getUser() { + if (!truth(user)) { + if (config().getLocalRaw(SshTool.PROP_USER).isPresent()) { + LOG.warn("User configuration for "+this+" set after deployment; deprecated behaviour may not be supported in future versions"); + } + return getConfig(SshTool.PROP_USER); + } + return user; + } + + /** port for SSHing */ + public int getPort() { + return getConfig(SshTool.PROP_PORT); + } + + protected <T> T execSsh(final Map<String, ?> props, final Function<ShellTool, T> task) { + final LoadingCache<Map<String, ?>, Pool<SshTool>> sshPoolCache = getSshPoolCache(); + Pool<SshTool> pool = sshPoolCache.getUnchecked(props); + if (LOG.isTraceEnabled()) { + LOG.trace("{} execSsh got pool: {}", this, pool); + } + + if (truth(props.get(CLOSE_CONNECTION.getName()))) { + Function<SshTool, T> close = new Function<SshTool, T>() { + @Override + public T apply(SshTool input) { + T result = task.apply(input); + if (LOG.isDebugEnabled()) { + LOG.debug("{} invalidating all sshPoolCache entries: {}", SshMachineLocation.this, sshPoolCache.stats().toString()); + } + sshPoolCache.invalidateAll(); + sshPoolCache.cleanUp(); + return result; + } + }; + return pool.exec(close); + } else { + return pool.exec(task); + } + } + + protected SshTool connectSsh() { + return connectSsh(ImmutableMap.of()); + } + + protected boolean previouslyConnected = false; + protected SshTool connectSsh(Map props) { + try { + if (!truth(user)) { + String newUser = getUser(); + if (LOG.isTraceEnabled()) LOG.trace("For "+this+", setting user in connectSsh: oldUser="+user+"; newUser="+newUser); + user = newUser; + } + + ConfigBag args = new ConfigBag() + .configure(SshTool.PROP_USER, user) + // default value of host, overridden if SSH_HOST is supplied + .configure(SshTool.PROP_HOST, address.getHostName()); + + for (Map.Entry<String,Object> entry: config().getBag().getAllConfig().entrySet()) { + String key = entry.getKey(); + if (key.startsWith(SshTool.BROOKLYN_CONFIG_KEY_PREFIX)) { + key = Strings.removeFromStart(key, SshTool.BROOKLYN_CONFIG_KEY_PREFIX); + } else if (ALL_SSH_CONFIG_KEY_NAMES.contains(entry.getKey())) { + // key should be included, and does not need to be changed + + // TODO make this config-setting mechanism more universal + // currently e.g. it will not admit a tool-specific property. + // thinking either we know about the tool here, + // or we don't allow unadorned keys to be set + // (require use of BROOKLYN_CONFIG_KEY_PREFIX) + } else { + // this key is not applicable here; ignore it + continue; + } + args.putStringKey(key, entry.getValue()); + } + + // Explicit props trump all. + args.putAll(props); + + if (LOG.isTraceEnabled()) LOG.trace("creating ssh session for "+args); + if (!user.equals(args.get(SshTool.PROP_USER))) { + LOG.warn("User mismatch configuring ssh for "+this+": preferring user "+args.get(SshTool.PROP_USER)+" over "+user); + user = args.get(SshTool.PROP_USER); + } + + // look up tool class + String sshToolClass = args.get(SshTool.PROP_TOOL_CLASS); + if (sshToolClass==null) sshToolClass = SshjTool.class.getName(); + SshTool ssh = (SshTool) Class.forName(sshToolClass).getConstructor(Map.class).newInstance(args.getAllConfig()); + + if (LOG.isTraceEnabled()) LOG.trace("using ssh-tool {} (of type {}); props ", ssh, sshToolClass); + + Tasks.setBlockingDetails("Opening ssh connection"); + try { ssh.connect(); } finally { Tasks.setBlockingDetails(null); } + previouslyConnected = true; + return ssh; + } catch (Exception e) { + if (previouslyConnected) throw Throwables.propagate(e); + // subsequence connection (above) most likely network failure, our remarks below won't help + // on first connection include additional information if we can't connect, to help with debugging + String rootCause = Throwables.getRootCause(e).getMessage(); + throw new IllegalStateException("Cannot establish ssh connection to "+user+" @ "+this+ + (rootCause!=null && !rootCause.isEmpty() ? " ("+rootCause+")" : "")+". \n"+ + "Ensure that passwordless and passphraseless ssh access is enabled using standard keys from ~/.ssh or " + + "as configured in brooklyn.properties. " + + "Check that the target host is accessible, " + + "that credentials are correct (location and permissions if using a key), " + + "that the SFTP subsystem is available on the remote side, " + + "and that there is sufficient random noise in /dev/random on both ends. " + + "To debug less common causes, see the original error in the trace or log, and/or enable 'net.schmizz' (sshj) logging." + , e); + } + } + + // TODO submitCommands and submitScript which submit objects we can subsequently poll (cf JcloudsSshMachineLocation.submitRunScript) + + /** + * Executes a set of commands, directly on the target machine (no wrapping in script). + * Joined using {@literal ;} by default. + * <p> + * Stdout and stderr will be logged automatically to brooklyn.SSH logger, unless the + * flags 'noStdoutLogging' and 'noStderrLogging' are set. To set a logging prefix, use + * the flag 'logPrefix'. + * <p> + * Currently runs the commands in an interactive/login shell + * by passing each as a line to bash. To terminate early, use: + * <pre> + * foo || exit 1 + * </pre> + * It may be desirable instead, in some situations, to wrap as: + * <pre> + * { line1 ; } && { line2 ; } ... + * </pre> + * and run as a single command (possibly not as an interacitve/login + * shell) causing the script to exit on the first command which fails. + * <p> + * Currently this has to be done by the caller. + * (If desired we can add a flag {@code exitIfAnyNonZero} to support this mode, + * and/or {@code commandPrepend} and {@code commandAppend} similar to + * (currently supported in SshjTool) {@code separator}.) + */ + public int execCommands(String summaryForLogging, List<String> commands) { + return execCommands(MutableMap.<String,Object>of(), summaryForLogging, commands, MutableMap.<String,Object>of()); + } + public int execCommands(Map<String,?> props, String summaryForLogging, List<String> commands) { + return execCommands(props, summaryForLogging, commands, MutableMap.<String,Object>of()); + } + public int execCommands(String summaryForLogging, List<String> commands, Map<String,?> env) { + return execCommands(MutableMap.<String,Object>of(), summaryForLogging, commands, env); + } + public int execCommands(Map<String,?> props, String summaryForLogging, List<String> commands, Map<String,?> env) { + return newExecWithLoggingHelpers().execCommands(augmentPropertiesWithSshConfigGivenToProps(props), summaryForLogging, commands, env); + } + + /** + * Executes a set of commands, wrapped as a script sent to the remote machine. + * <p> + * Stdout and stderr will be logged automatically to brooklyn.SSH logger, unless the + * flags 'noStdoutLogging' and 'noStderrLogging' are set. To set a logging prefix, use + * the flag 'logPrefix'. + */ + public int execScript(String summaryForLogging, List<String> commands) { + return execScript(MutableMap.<String,Object>of(), summaryForLogging, commands, MutableMap.<String,Object>of()); + } + public int execScript(Map<String,?> props, String summaryForLogging, List<String> commands) { + return execScript(props, summaryForLogging, commands, MutableMap.<String,Object>of()); + } + public int execScript(String summaryForLogging, List<String> commands, Map<String,?> env) { + return execScript(MutableMap.<String,Object>of(), summaryForLogging, commands, env); + } + public int execScript(Map<String,?> props, String summaryForLogging, List<String> commands, Map<String,?> env) { + return newExecWithLoggingHelpers().execScript(augmentPropertiesWithSshConfigGivenToProps(props), summaryForLogging, commands, env); + } + + private Map<String, Object> augmentPropertiesWithSshConfigGivenToProps(Map<String, ?> props) { + Map<String,Object> augmentedProps = Maps.newHashMap(props); + for (ConfigKey<?> config : SSH_CONFIG_GIVEN_TO_PROPS) { + if (!augmentedProps.containsKey(config.getName()) && hasConfig(config, true)) + augmentedProps.put(config.getName(), getConfig(config)); + } + return augmentedProps; + } + + protected ExecWithLoggingHelpers newExecWithLoggingHelpers() { + return new ExecWithLoggingHelpers("SSH") { + @Override + protected <T> T execWithTool(MutableMap<String, Object> props, Function<ShellTool, T> function) { + return execSsh(props, function); + } + @Override + protected void preExecChecks() { + Preconditions.checkNotNull(address, "host address must be specified for ssh"); + } + @Override + protected String constructDefaultLoggingPrefix(ConfigBag execFlags) { + String hostname = getAddress().getHostName(); + Integer port = execFlags.peek(SshTool.PROP_PORT); + if (port == null) port = getConfig(ConfigUtils.prefixedKey(SshTool.BROOKLYN_CONFIG_KEY_PREFIX, SshTool.PROP_PORT)); + return (user != null ? user+"@" : "") + hostname + (port != null ? ":"+port : ""); + } + @Override + protected String getTargetName() { + return ""+SshMachineLocation.this; + } + }.logger(logSsh); + } + + /** + * @deprecated since 0.7.0; use {@link #execCommands(Map, String, List, Map), and rely on that calling the execWithLogging + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + @Deprecated + protected int execWithLogging(Map<String,?> props, String summaryForLogging, List<String> commands, Map env, final Closure<Integer> execCommand) { + return newExecWithLoggingHelpers().execWithLogging(props, summaryForLogging, commands, env, new ExecRunner() { + @Override public int exec(ShellTool ssh, Map<String, ?> flags, List<String> cmds, Map<String, ?> env) { + return execCommand.call(ssh, flags, cmds, env); + }}); + } + + public int copyTo(File src, File destination) { + return copyTo(MutableMap.<String,Object>of(), src, destination); + } + public int copyTo(Map<String,?> props, File src, File destination) { + return copyTo(props, src, destination.getPath()); + } + + public int copyTo(File src, String destination) { + return copyTo(MutableMap.<String,Object>of(), src, destination); + } + public int copyTo(Map<String,?> props, File src, String destination) { + Preconditions.checkNotNull(address, "Host address must be specified for scp"); + Preconditions.checkArgument(src.exists(), "File %s must exist for scp", src.getPath()); + try { + return copyTo(props, new FileInputStream(src), src.length(), destination); + } catch (FileNotFoundException e) { + throw Throwables.propagate(e); + } + } + public int copyTo(Reader src, String destination) { + return copyTo(MutableMap.<String,Object>of(), src, destination); + } + public int copyTo(Map<String,?> props, Reader src, String destination) { + return copyTo(props, new ReaderInputStream(src), destination); + } + public int copyTo(InputStream src, String destination) { + return copyTo(MutableMap.<String,Object>of(), src, destination); + } + public int copyTo(InputStream src, long filesize, String destination) { + return copyTo(MutableMap.<String,Object>of(), src, filesize, destination); + } + // FIXME the return code is not a reliable indicator of success or failure + public int copyTo(final Map<String,?> props, final InputStream src, final long filesize, final String destination) { + if (filesize == -1) { + return copyTo(props, src, destination); + } else { + return execSsh(props, new Function<ShellTool,Integer>() { + public Integer apply(ShellTool ssh) { + return ((SshTool) ssh).copyToServer(props, new KnownSizeInputStream(src, filesize), destination); + }}); + } + } + // FIXME the return code is not a reliable indicator of success or failure + // Closes input stream before returning + public int copyTo(final Map<String,?> props, final InputStream src, final String destination) { + return execSsh(props, new Function<ShellTool,Integer>() { + public Integer apply(ShellTool ssh) { + return ((SshTool)ssh).copyToServer(props, src, destination); + }}); + } + + // FIXME the return code is not a reliable indicator of success or failure + public int copyFrom(String remote, String local) { + return copyFrom(MutableMap.<String,Object>of(), remote, local); + } + public int copyFrom(final Map<String,?> props, final String remote, final String local) { + return execSsh(props, new Function<ShellTool,Integer>() { + public Integer apply(ShellTool ssh) { + return ((SshTool)ssh).copyFromServer(props, remote, new File(local)); + }}); + } + + public int installTo(String url, String destPath) { + return installTo(MutableMap.<String, Object>of(), url, destPath); + } + + public int installTo(Map<String,?> props, String url, String destPath) { + return installTo(ResourceUtils.create(this), props, url, destPath); + } + + public int installTo(ResourceUtils loader, String url, String destPath) { + return installTo(loader, MutableMap.<String, Object>of(), url, destPath); + } + + /** + * Installs the given URL at the indicated destination path. + * <p> + * Attempts to curl the source URL on the remote machine, + * then if that fails, loads locally (from classpath or file) and transfers. + * <p> + * Use {@link ArchiveUtils} to handle directories and their contents properly. + * + * TODO allow s3://bucket/file URIs for AWS S3 resources + * TODO use PAX-URL style URIs for maven artifacts + * TODO use subtasks here for greater visibility?; deprecate in favour of SshTasks.installFromUrl? + * + * @param utils A {@link ResourceUtils} that can resolve the source URLs + * @param url The source URL to be installed + * @param destPath The file to be created on the destination + * + * @see ArchiveUtils#deploy(String, SshMachineLocation, String) + * @see ArchiveUtils#deploy(String, SshMachineLocation, String, String) + * @see ResourceUtils#getResourceFromUrl(String) + */ + public int installTo(ResourceUtils utils, Map<String,?> props, String url, String destPath) { + LOG.debug("installing {} to {} on {}, attempting remote curl", new Object[] { url, destPath, this }); + + try { + PipedInputStream insO = new PipedInputStream(); OutputStream outO = new PipedOutputStream(insO); + PipedInputStream insE = new PipedInputStream(); OutputStream outE = new PipedOutputStream(insE); + StreamGobbler sgsO = new StreamGobbler(insO, null, LOG); sgsO.setLogPrefix("[curl @ "+address+":stdout] ").start(); + StreamGobbler sgsE = new StreamGobbler(insE, null, LOG); sgsE.setLogPrefix("[curl @ "+address+":stdout] ").start(); + Map<String, ?> sshProps = MutableMap.<String, Object>builder().putAll(props).put("out", outO).put("err", outE).build(); + int result = execScript(sshProps, "copying remote resource "+url+" to server", ImmutableList.of( + BashCommands.INSTALL_CURL, // TODO should hold the 'installing' mutex + "mkdir -p `dirname '"+destPath+"'`", + "curl "+url+" -L --silent --insecure --show-error --fail --connect-timeout 60 --max-time 600 --retry 5 -o '"+destPath+"'")); + sgsO.close(); + sgsE.close(); + if (result != 0) { + LOG.debug("installing {} to {} on {}, curl failed, attempting local fetch and copy", new Object[] { url, destPath, this }); + try { + Tasks.setBlockingDetails("retrieving resource "+url+" for copying across"); + InputStream stream = utils.getResourceFromUrl(url); + Tasks.setBlockingDetails("copying resource "+url+" to server"); + result = copyTo(props, stream, destPath); + } finally { + Tasks.setBlockingDetails(null); + } + } + if (result == 0) { + LOG.debug("installing {} complete; {} on {}", new Object[] { url, destPath, this }); + } else { + LOG.warn("installing {} failed; {} on {}: {}", new Object[] { url, destPath, this, result }); + } + return result; + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @Override + public String toString() { + return "SshMachineLocation["+getDisplayName()+":"+address+":"+getPort()+"@"+getId()+"]"; + } + + @Override + public String toVerboseString() { + return Objects.toStringHelper(this).omitNullValues() + .add("id", getId()).add("name", getDisplayName()) + .add("user", getUser()).add("address", getAddress()).add("port", getPort()) + .add("parentLocation", getParent()) + .toString(); + } + + /** + * @see #obtainPort(PortRange) + * @see PortRanges#ANY_HIGH_PORT + */ + @Override + public boolean obtainSpecificPort(int portNumber) { + synchronized (usedPorts) { + // TODO Does not yet check if the port really is free on this machine + if (usedPorts.contains(portNumber)) { + return false; + } else { + usedPorts.add(portNumber); + return true; + } + } + } + + @Override + public int obtainPort(PortRange range) { + synchronized (usedPorts) { + for (int p: range) + if (obtainSpecificPort(p)) return p; + if (LOG.isDebugEnabled()) LOG.debug("unable to find port in {} on {}; returning -1", range, this); + return -1; + } + } + + @Override + public void releasePort(int portNumber) { + synchronized (usedPorts) { + usedPorts.remove((Object) portNumber); + } + } + + public boolean isSshable() { + String cmd = "date"; + try { + try { + Socket s = new Socket(); + s.connect(new InetSocketAddress(getAddress(), getPort()), SSHABLE_CONNECT_TIMEOUT); + s.close(); + } catch (IOException e) { + if (LOG.isDebugEnabled()) LOG.debug(""+this+" not [yet] reachable (socket "+getAddress()+":"+getPort()+"): "+e); + return false; + } + // this should do execCommands because sftp subsystem might not be available (or sometimes seems to take a while for it to become so?) + int result = execCommands(MutableMap.<String,Object>of(), "isSshable", ImmutableList.of(cmd)); + if (result == 0) { + return true; + } else { + if (LOG.isDebugEnabled()) LOG.debug("Not reachable: {}, executing `{}`, exit code {}", new Object[] {this, cmd, result}); + return false; + } + } catch (SshException e) { + if (LOG.isDebugEnabled()) LOG.debug("Exception checking if "+this+" is reachable; assuming not", e); + return false; + } catch (IllegalStateException e) { + if (LOG.isDebugEnabled()) LOG.debug("Exception checking if "+this+" is reachable; assuming not", e); + return false; + } catch (RuntimeException e) { + if (Exceptions.getFirstThrowableOfType(e, IOException.class) != null) { + if (LOG.isDebugEnabled()) LOG.debug("Exception checking if "+this+" is reachable; assuming not", e); + return false; + } else { + throw e; + } + } + } + + @Override + public OsDetails getOsDetails() { + return getMachineDetails().getOsDetails(); + } + + @Override + public MachineDetails getMachineDetails() { + synchronized (machineDetailsLock) { + if (machineDetails == null) { + machineDetails = getConfig(MACHINE_DETAILS); + } + if (machineDetails == null) { + machineDetails = inferMachineDetails(); + } + } + return machineDetails; + } + + protected MachineDetails inferMachineDetails() { + boolean detectionEnabled = getConfig(DETECT_MACHINE_DETAILS); + if (!detectionEnabled) + return new BasicMachineDetails(new BasicHardwareDetails(-1, -1), new BasicOsDetails("UNKNOWN", "UNKNOWN", "UNKNOWN")); + + Tasks.setBlockingDetails("Waiting for machine details"); + try { + return BasicMachineDetails.forSshMachineLocation(this); + } finally { + Tasks.resetBlockingDetails(); + } + } + + @Override + public void acquireMutex(String mutexId, String description) throws RuntimeInterruptedException { + try { + getMutexSupport().acquireMutex(mutexId, description); + } catch (InterruptedException ie) { + throw new RuntimeInterruptedException("Interrupted waiting for mutex: " + mutexId, ie); + } + } + + @Override + public boolean tryAcquireMutex(String mutexId, String description) { + return getMutexSupport().tryAcquireMutex(mutexId, description); + } + + @Override + public void releaseMutex(String mutexId) { + getMutexSupport().releaseMutex(mutexId); + } + + @Override + public boolean hasMutex(String mutexId) { + return getMutexSupport().hasMutex(mutexId); + } + + //We want the SshMachineLocation to be serializable and therefore the pool needs to be dealt with correctly. + //In this case we are not serializing the pool (we made the field transient) and create a new pool when deserialized. + //This fix is currently needed for experiments, but isn't used in normal Brooklyn usage. + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + getSshPoolCache(); + } + + /** returns the un-passphrased key-pair info if a key is being used, or else null */ + public KeyPair findKeyPair() { + String fn = getConfig(SshTool.PROP_PRIVATE_KEY_FILE); + ResourceUtils r = ResourceUtils.create(this); + if (fn!=null) return SecureKeys.readPem(r.getResourceFromUrl(fn), getConfig(SshTool.PROP_PRIVATE_KEY_PASSPHRASE)); + String data = getConfig(SshTool.PROP_PRIVATE_KEY_DATA); + if (data!=null) return SecureKeys.readPem(new ReaderInputStream(new StringReader(data)), getConfig(SshTool.PROP_PRIVATE_KEY_PASSPHRASE)); + if (findPassword()!=null) + // if above not specified, and password is, use password + return null; + // fall back to id_rsa and id_dsa + if (new File( Urls.mergePaths(System.getProperty("user.home"), ".ssh/id_rsa") ).exists() ) + return SecureKeys.readPem(r.getResourceFromUrl("~/.ssh/id_rsa"), getConfig(SshTool.PROP_PRIVATE_KEY_PASSPHRASE)); + if (new File( Urls.mergePaths(System.getProperty("user.home"), ".ssh/id_dsa") ).exists() ) + return SecureKeys.readPem(r.getResourceFromUrl("~/.ssh/id_dsa"), getConfig(SshTool.PROP_PRIVATE_KEY_PASSPHRASE)); + LOG.warn("Unable to extract any key or passphrase data in request to findKeyPair for "+this); + return null; + } + + /** returns the password being used to log in, if a password is being used, or else null */ + public String findPassword() { + return getConfig(SshTool.PROP_PASSWORD); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e2c57058/core/src/main/java/org/apache/brooklyn/location/basic/SupportsPortForwarding.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/location/basic/SupportsPortForwarding.java b/core/src/main/java/org/apache/brooklyn/location/basic/SupportsPortForwarding.java new file mode 100644 index 0000000..8c26506 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/location/basic/SupportsPortForwarding.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.location.basic; + +import brooklyn.util.net.Cidr; + +import com.google.common.net.HostAndPort; + +public interface SupportsPortForwarding { + + /** returns an endpoint suitable for contacting the indicated private port on this object, + * from the given Cidr, creating it if necessary and possible; + * may return null if forwarding not available + */ + public HostAndPort getSocketEndpointFor(Cidr accessor, int privatePort); + + /** marker on a location to indicate that port forwarding should be done automatically + * for attempts to access from Brooklyn + */ + public interface RequiresPortForwarding extends SupportsPortForwarding { + } + +}
