http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynEntityMirror.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynEntityMirror.java b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynEntityMirror.java new file mode 100644 index 0000000..ac884fa --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynEntityMirror.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.brooklynnode; + +import java.util.Map; + +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.ImplementedBy; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.entity.brooklynnode.BrooklynNode; +import org.apache.brooklyn.sensor.core.Sensors; +import org.apache.brooklyn.util.time.Duration; + +/** Provides an entity which can sit in one brooklyn domain and reflect the status of an entity + * via the REST API of another domain. + * <p> + * Note tests for this depend on a REST server so are in other projects; search for *Mirror*Test, + * as well as *BrooklynNode*Test. */ +@Catalog(name="Brooklyn Entity Mirror", description="Provides an entity which can sit in one brooklyn " + + "domain and reflect the status of an entity via the REST API of another domain.") +@ImplementedBy(BrooklynEntityMirrorImpl.class) +public interface BrooklynEntityMirror extends Entity { + + // caller must specify this: + public static final ConfigKey<String> MIRRORED_ENTITY_URL = ConfigKeys.newStringConfigKey("brooklyn.mirror.entity_url", + "URL for the entity in the remote Brooklyn mgmt endpoint"); + + // caller may specify this for reference: + public static final ConfigKey<String> MIRRORED_ENTITY_ID = ConfigKeys.newStringConfigKey("brooklyn.mirror.entity_id", + "Brooklyn ID of the entity being mirrored"); + + // must be specified if required (could be inherited if parent/config is available at init time, but it's not currently) + public static final ConfigKey<String> MANAGEMENT_USER = BrooklynNode.MANAGEMENT_USER; + public static final ConfigKey<String> MANAGEMENT_PASSWORD = BrooklynNode.MANAGEMENT_PASSWORD; + + public static final ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newConfigKey(Duration.class, "brooklyn.mirror.poll_period", + "Frequency to poll for client sensors", Duration.FIVE_SECONDS); + + public static final AttributeSensor<String> MIRROR_STATUS = Sensors.newStringSensor("brooklyn.mirror.monitoring_status"); + @SuppressWarnings("rawtypes") + public static final AttributeSensor<Map> MIRROR_SUMMARY = Sensors.newSensor(Map.class, "brooklyn.mirror.summary", + "The json map returned by the entity rest endpoint (ie the EntitySummary model)"); + public static final AttributeSensor<String> MIRROR_CATALOG_ITEM_ID = Sensors.newStringSensor("brooklyn.mirror.catalog_item_id", + "The catalog item id of the mirrored entity in the remote brooklyn"); + + public EntityHttpClient http(); + +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java new file mode 100644 index 0000000..34479d6 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.brooklynnode; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Callable; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.effector.core.EffectorBody; +import org.apache.brooklyn.entity.core.AbstractEntity; +import org.apache.brooklyn.entity.core.Attributes; +import org.apache.brooklyn.entity.core.Entities; +import org.apache.brooklyn.entity.core.EntityDynamicType; +import org.apache.brooklyn.entity.lifecycle.Lifecycle; +import org.apache.brooklyn.entity.lifecycle.ServiceStateLogic; +import org.apache.brooklyn.sensor.core.Sensors; +import org.apache.brooklyn.sensor.feed.http.HttpFeed; +import org.apache.brooklyn.sensor.feed.http.HttpPollConfig; +import org.apache.brooklyn.util.collections.Jsonya; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.http.HttpToolResponse; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.net.Urls; +import org.apache.http.HttpStatus; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.net.MediaType; +import com.google.gson.Gson; + +public class BrooklynEntityMirrorImpl extends AbstractEntity implements BrooklynEntityMirror { + @SuppressWarnings("rawtypes") + private class MirrorSummary implements Function<HttpToolResponse, Map> { + @Override + public Map apply(HttpToolResponse input) { + Map<?, ?> entitySummary = new Gson().fromJson(input.getContentAsString(), Map.class); + String catalogItemId = (String)entitySummary.get("catalogItemId"); + setAttribute(MIRROR_CATALOG_ITEM_ID, catalogItemId); + return entitySummary; + } + } + + private HttpFeed mirror; + + + //Passively mirror entity's state + @Override + protected void initEnrichers() {} + + @Override + public void init() { + super.init(); + connectSensorsAsync(); + + //start spinning, could take some time before MIRRORED_ENTITY_URL is available for first time mirroring + setAttribute(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.STARTING); + } + + @Override + public void rebind() { + super.rebind(); + connectSensorsAsync(); + } + + protected void connectSensorsAsync() { + Callable<Void> asyncTask = new Callable<Void>() { + @Override + public Void call() throws Exception { + //blocks until available (could be a task) + String mirroredEntityUrl = getConfig(MIRRORED_ENTITY_URL); + Preconditions.checkNotNull(mirroredEntityUrl, "Required config: "+MIRRORED_ENTITY_URL); + + connectSensors(mirroredEntityUrl); + return null; + } + }; + + DynamicTasks.queueIfPossible( + Tasks.<Void>builder() + .name("Start entity mirror feed") + .body(asyncTask) + .build()) + .orSubmitAsync(this); + } + + protected void connectSensors(String mirroredEntityUrl) { + Function<HttpToolResponse, Void> mirrorSensors = new Function<HttpToolResponse,Void>() { + @SuppressWarnings("rawtypes") + @Override + public Void apply(HttpToolResponse input) { + Map sensors = new Gson().fromJson(input.getContentAsString(), Map.class); + for (Object kv: sensors.entrySet()) + setAttribute(Sensors.newSensor(Object.class, ""+((Map.Entry)kv).getKey()), ((Map.Entry)kv).getValue()); + setAttribute(MIRROR_STATUS, "normal"); + return null; + } + }; + + final BrooklynEntityMirrorImpl self = this; + mirror = HttpFeed.builder().entity(this) + .baseUri(mirroredEntityUrl) + .credentialsIfNotNull(getConfig(BrooklynNode.MANAGEMENT_USER), getConfig(BrooklynNode.MANAGEMENT_PASSWORD)) + .period(getConfig(POLL_PERIOD)) + .poll(HttpPollConfig.forMultiple() + .suburl("/sensors/current-state") + .onSuccess(mirrorSensors) + .onFailureOrException(new Function<Object, Void>() { + @Override + public Void apply(Object input) { + ServiceStateLogic.updateMapSensorEntry(self, Attributes.SERVICE_PROBLEMS, "mirror-feed", "error contacting service"); + if (input instanceof HttpToolResponse) { + int responseCode = ((HttpToolResponse)input).getResponseCode(); + if (responseCode == HttpStatus.SC_NOT_FOUND) { + //the remote entity no longer exists + Entities.unmanage(self); + } + } + return null; + } + })) + .poll(HttpPollConfig.forSensor(MIRROR_SUMMARY).onSuccess(new MirrorSummary())).build(); + + populateEffectors(); + } + + private void populateEffectors() { + HttpToolResponse result = http().get("/effectors"); + Collection<?> cfgEffectors = new Gson().fromJson(result.getContentAsString(), Collection.class); + Collection<Effector<String>> remoteEntityEffectors = RemoteEffectorBuilder.of(cfgEffectors); + EntityDynamicType mutableEntityType = getMutableEntityType(); + for (Effector<String> eff : remoteEntityEffectors) { + //remote already started + if ("start".equals(eff.getName())) continue; + mutableEntityType.addEffector(eff); + } + } + + protected void disconnectSensors() { + if (mirror != null) mirror.stop(); + } + + @Override + public void destroy() { + disconnectSensors(); + } + + @Override + public EntityHttpClient http() { + return new EntityHttpClientImpl(this, MIRRORED_ENTITY_URL); + } + + public static class RemoteEffector<T> extends EffectorBody<T> { + public final String remoteEffectorName; + public final Function<HttpToolResponse, T> resultParser; + + /** creates an effector implementation which POSTs to a remote effector endpoint, optionally converting + * the byte[] response (if resultParser is null then null is returned) */ + public RemoteEffector(String remoteEffectorName, @Nullable Function<HttpToolResponse,T> resultParser) { + this.remoteEffectorName = remoteEffectorName; + this.resultParser = resultParser; + } + + @Override + public T call(ConfigBag parameters) { + MutableMap<String, String> headers = MutableMap.of(com.google.common.net.HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString()); + byte[] httpBody = Jsonya.of(parameters.getAllConfig()).toString().getBytes(); + String effectorUrl = Urls.mergePaths("effectors", Urls.encode(remoteEffectorName)); + HttpToolResponse result = ((BrooklynEntityMirror)entity()).http().post(effectorUrl, headers, httpBody); + if (resultParser!=null) return resultParser.apply(result); + else return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNode.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNode.java b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNode.java new file mode 100644 index 0000000..51b1281 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNode.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.brooklynnode; + +import java.net.InetAddress; +import java.net.URI; +import java.util.List; +import java.util.Map; + +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.ImplementedBy; +import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode; +import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.BrooklynVersion; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.config.MapConfigKey; +import org.apache.brooklyn.effector.core.Effectors; +import org.apache.brooklyn.entity.core.BrooklynConfigKeys; +import org.apache.brooklyn.entity.java.UsesJava; +import org.apache.brooklyn.entity.software.base.SoftwareProcess; +import org.apache.brooklyn.sensor.core.BasicAttributeSensor; +import org.apache.brooklyn.sensor.core.BasicAttributeSensorAndConfigKey; +import org.apache.brooklyn.sensor.core.PortAttributeSensorAndConfigKey; +import org.apache.brooklyn.sensor.core.Sensors; +import org.apache.brooklyn.sensor.core.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.flags.SetFromFlag; +import org.apache.brooklyn.util.net.Networking; +import org.apache.brooklyn.util.ssh.BashCommands; +import org.apache.brooklyn.util.time.Duration; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.reflect.TypeToken; + +@Catalog(name="Brooklyn Node", description="Deploys a Brooklyn management server") +@ImplementedBy(BrooklynNodeImpl.class) +public interface BrooklynNode extends SoftwareProcess, UsesJava { + + @SuppressWarnings("serial") + @SetFromFlag("copyToRundir") + public static final BasicAttributeSensorAndConfigKey<Map<String,String>> COPY_TO_RUNDIR = new BasicAttributeSensorAndConfigKey<Map<String,String>>( + new TypeToken<Map<String,String>>() {}, "brooklynnode.copytorundir", "URLs of resources to be copied across to the server, giving the path they are to be copied to", MutableMap.<String,String>of()); + + @SetFromFlag("version") + public static final ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(BrooklynConfigKeys.SUGGESTED_VERSION, "0.8.0-SNAPSHOT"); // BROOKLYN_VERSION + + @SetFromFlag("distroUploadUrl") + public static final ConfigKey<String> DISTRO_UPLOAD_URL = ConfigKeys.newStringConfigKey( + "brooklynnode.distro.uploadurl", "URL for uploading the brooklyn distro (retrieved locally and pushed to remote install location. Takes precedence over downloadUrl, if non-null)", null); + + // Note that download URL only supports versions in org.apache.brooklyn, so not 0.6.0 and earlier + // (which used maven group io.brooklyn). Aled thinks we can live with that. + @SetFromFlag("downloadUrl") + BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new StringAttributeSensorAndConfigKey( + SoftwareProcess.DOWNLOAD_URL, + "<#if version?contains(\"SNAPSHOT\")>"+ + "https://repository.apache.org/service/local/artifact/maven/redirect?r=snapshots&g=org.apache.brooklyn&v=${version}&a=brooklyn-dist&c=dist&e=tar.gz" + + "<#else>"+ + "http://search.maven.org/remotecontent?filepath=org/apache/brooklyn/brooklyn-dist/${version}/brooklyn-dist-${version}-dist.tar.gz"+ + "</#if>"); + + @SetFromFlag("subpathInArchive") + ConfigKey<String> SUBPATH_IN_ARCHIVE = ConfigKeys.newStringConfigKey("brooklynnode.download.archive.subpath", + "Path to the main directory in the archive being supplied for installation; " + + "to use the root of an archive, specify '.'; " + + "default value taken based on download URL (e.g. 'name' for 'http://path/name.tgz' or 'http://path/name-dist.tgz') " + + "falling back to an appropriate value for brooklyn, " + + "e.g. 'brooklyn-"+BrooklynVersion.INSTANCE.getVersion()+"'", null); + + @SetFromFlag("managementUser") + ConfigKey<String> MANAGEMENT_USER = ConfigKeys.newConfigKey("brooklynnode.managementUser", + "The user for logging into the brooklyn web-console (also used for health-checks)", + "admin"); + + @SetFromFlag("managementPassword") + ConfigKey<String> MANAGEMENT_PASSWORD = + ConfigKeys.newStringConfigKey("brooklynnode.managementPassword", "Password for MANAGEMENT_USER", null); + + /** useful e.g. with {@link BashCommands#generateKeyInDotSshIdRsaIfNotThere() } */ + @SetFromFlag("extraCustomizationScript") + ConfigKey<String> EXTRA_CUSTOMIZATION_SCRIPT = ConfigKeys.newStringConfigKey("brooklynnode.customization.extraScript", + "Optional additional script commands to run as part of customization; this might e.g. ensure id_rsa is set up", + null); + + static enum ExistingFileBehaviour { + DO_NOT_USE, USE_EXISTING, OVERWRITE, FAIL + } + + @SetFromFlag("onExistingProperties") + ConfigKey<ExistingFileBehaviour> ON_EXISTING_PROPERTIES_FILE = ConfigKeys.newConfigKey(ExistingFileBehaviour.class, + "brooklynnode.properties.file.ifExists", + "What to do in the case where a global brooklyn.properties already exists", + ExistingFileBehaviour.FAIL); + + @SetFromFlag("launchCommand") + ConfigKey<String> LAUNCH_COMMAND = ConfigKeys.newStringConfigKey("brooklynnode.launch.command", + "Path to the script to launch Brooklyn / the app relative to the subpath in the archive, defaulting to 'bin/brooklyn'", + "bin/brooklyn"); + + @SetFromFlag("launchParameters") + ConfigKey<String> EXTRA_LAUNCH_PARAMETERS = ConfigKeys.newStringConfigKey("brooklynnode.launch.parameters.extra", + "Launch parameters passed on the CLI, in addition to 'launch' and parameters implied by other config keys (and placed afterwards on the command line)"); + + @SetFromFlag("launchCommandCreatesPidFile") + ConfigKey<Boolean> LAUNCH_COMMAND_CREATES_PID_FILE = ConfigKeys.newBooleanConfigKey("brooklynnode.launch.command.pid.updated", + "Whether the launch script creates/updates the PID file, if not the entity will do so, " + + "but note it will not necessarily kill sub-processes", + true); + + @SetFromFlag("app") + public static final BasicAttributeSensorAndConfigKey<String> APP = new BasicAttributeSensorAndConfigKey<String>( + String.class, "brooklynnode.app", "Application (fully qualified class name) to launch using the brooklyn CLI", null); + + @SetFromFlag("locations") + public static final BasicAttributeSensorAndConfigKey<String> LOCATIONS = new BasicAttributeSensorAndConfigKey<String>( + String.class, "brooklynnode.locations", "Locations to use when launching the app", null); + + /** + * Exposed just for testing; remote path is not passed into the launched brooklyn so this won't be used! + * This will likely change in a future version. + */ + @VisibleForTesting + @SetFromFlag("brooklynGlobalPropertiesRemotePath") + public static final ConfigKey<String> BROOKLYN_GLOBAL_PROPERTIES_REMOTE_PATH = ConfigKeys.newStringConfigKey( + "brooklynnode.brooklynproperties.global.remotepath", + "Remote path for the global brooklyn.properties file to be uploaded", "${HOME}/.brooklyn/brooklyn.properties; "+ + "only useful for testing as this path will not be used on the remote system"); + + @SetFromFlag("brooklynGlobalPropertiesUri") + public static final ConfigKey<String> BROOKLYN_GLOBAL_PROPERTIES_URI = ConfigKeys.newStringConfigKey( + "brooklynnode.brooklynproperties.global.uri", "URI for the global brooklyn properties file (uploaded to ~/.brooklyn/brooklyn.properties)", null); + + @SetFromFlag("brooklynGlobalPropertiesContents") + public static final ConfigKey<String> BROOKLYN_GLOBAL_PROPERTIES_CONTENTS = ConfigKeys.newStringConfigKey( + "brooklynnode.brooklynproperties.global.contents", "Contents for the global brooklyn properties file (uploaded to ~/.brooklyn/brooklyn.properties)", null); + + @SetFromFlag("brooklynLocalPropertiesRemotePath") + public static final ConfigKey<String> BROOKLYN_LOCAL_PROPERTIES_REMOTE_PATH = ConfigKeys.newStringConfigKey( + "brooklynnode.brooklynproperties.local.remotepath", "Remote path for the launch-specific brooklyn.properties file to be uploaded", "${driver.runDir}/brooklyn-local.properties"); + + @SetFromFlag("brooklynLocalPropertiesUri") + public static final ConfigKey<String> BROOKLYN_LOCAL_PROPERTIES_URI = ConfigKeys.newStringConfigKey( + "brooklynnode.brooklynproperties.local.uri", "URI for the launch-specific brooklyn properties file", null); + + @SetFromFlag("brooklynLocalPropertiesContents") + public static final ConfigKey<String> BROOKLYN_LOCAL_PROPERTIES_CONTENTS = ConfigKeys.newStringConfigKey( + "brooklynnode.brooklynproperties.local.contents", "Contents for the launch-specific brooklyn properties file", null); + + // For use in testing primarily + /** @deprecated since 0.7.0; TODO this should support BOM files */ + @Deprecated + @SetFromFlag("brooklynCatalogRemotePath") + public static final ConfigKey<String> BROOKLYN_CATALOG_REMOTE_PATH = ConfigKeys.newStringConfigKey( + "brooklynnode.brooklyncatalog.remotepath", "Remote path for the brooklyn catalog.xml file to be uploaded", "${HOME}/.brooklyn/catalog.xml"); + + /** @deprecated since 0.7.0; TODO this should support BOM files */ + @Deprecated + @SetFromFlag("brooklynCatalogUri") + public static final ConfigKey<String> BROOKLYN_CATALOG_URI = ConfigKeys.newStringConfigKey( + "brooklynnode.brooklyncatalog.uri", "URI for the brooklyn catalog.xml file (uploaded to ~/.brooklyn/catalog.xml)", null); + + /** @deprecated since 0.7.0; TODO this should support BOM files */ + @Deprecated + @SetFromFlag("brooklynCatalogContents") + public static final ConfigKey<String> BROOKLYN_CATALOG_CONTENTS = ConfigKeys.newStringConfigKey( + "brooklynnode.brooklyncatalog.contents", "Contents for the brooklyn catalog.xml file (uploaded to ~/.brooklyn/catalog.xml)", null); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @SetFromFlag("enabledHttpProtocols") + public static final BasicAttributeSensorAndConfigKey<List<String>> ENABLED_HTTP_PROTOCOLS = new BasicAttributeSensorAndConfigKey( + List.class, "brooklynnode.webconsole.enabledHttpProtocols", "List of enabled protocols (e.g. http, https)", ImmutableList.of("http")); + + @SetFromFlag("httpPort") + public static final PortAttributeSensorAndConfigKey HTTP_PORT = new PortAttributeSensorAndConfigKey( + "brooklynnode.webconsole.httpPort", "HTTP Port for the brooklyn web-console", "8081+"); + + @SetFromFlag("httpsPort") + public static final PortAttributeSensorAndConfigKey HTTPS_PORT = new PortAttributeSensorAndConfigKey( + "brooklynnode.webconsole.httpsPort", "HTTPS Port for the brooklyn web-console", "8443+"); + + @SetFromFlag("noWebConsoleSecurity") + public static final BasicAttributeSensorAndConfigKey<Boolean> NO_WEB_CONSOLE_AUTHENTICATION = new BasicAttributeSensorAndConfigKey<Boolean>( + Boolean.class, "brooklynnode.webconsole.nosecurity", "Whether to start the web console with no security", false); + + @SetFromFlag("bindAddress") + public static final BasicAttributeSensorAndConfigKey<InetAddress> WEB_CONSOLE_BIND_ADDRESS = new BasicAttributeSensorAndConfigKey<InetAddress>( + InetAddress.class, "brooklynnode.webconsole.address.bind", "Specifies the IP address of the NIC to bind the Brooklyn Management Console to (default 0.0.0.0)", Networking.ANY_NIC); + + @SetFromFlag("publicAddress") + public static final BasicAttributeSensorAndConfigKey<InetAddress> WEB_CONSOLE_PUBLIC_ADDRESS = new BasicAttributeSensorAndConfigKey<InetAddress>( + InetAddress.class, "brooklynnode.webconsole.address.public", "Specifies the public IP address or hostname for the Brooklyn Management Console"); + + public static final AttributeSensor<Boolean> WEB_CONSOLE_ACCESSIBLE = Sensors.newBooleanSensor( + "brooklynnode.webconsole.up", "Whether the web console is responding normally"); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @SetFromFlag("classpath") + public static final BasicAttributeSensorAndConfigKey<List<String>> CLASSPATH = new BasicAttributeSensorAndConfigKey( + List.class, "brooklynnode.classpath", "classpath to use, as list of URL entries", Lists.newArrayList()); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @SetFromFlag("portMapper") + public static final ConfigKey<Function<? super Integer, ? extends Integer>> PORT_MAPPER = (ConfigKey) ConfigKeys.newConfigKey(Function.class, + "brooklynnode.webconsole.portMapper", "Function for mapping private to public ports, for use in inferring the brooklyn URI", Functions.<Integer>identity()); + + public static final AttributeSensor<URI> WEB_CONSOLE_URI = new BasicAttributeSensor<URI>( + URI.class, "brooklynnode.webconsole.url", "URL of the brooklyn web-console"); + + public static final AttributeSensor<ManagementNodeState> MANAGEMENT_NODE_STATE = new BasicAttributeSensor<ManagementNodeState>( + ManagementNodeState.class, "brooklynnode.ha.state", "High-availability state of the management node (MASTER, HOT_STANDBY, etc)"); + + public static final ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newConfigKey(Duration.class, "brooklynnode.poll_period", + "Frequency to poll for client sensors", Duration.seconds(2)); + + public interface DeployBlueprintEffector { + ConfigKey<Map<String,Object>> BLUEPRINT_CAMP_PLAN = new MapConfigKey<Object>(Object.class, "blueprintPlan", + "CAMP plan for the blueprint to be deployed; currently only supports Java map or JSON string (not yet YAML)"); + ConfigKey<String> BLUEPRINT_TYPE = ConfigKeys.newStringConfigKey("blueprintType"); + ConfigKey<Map<String,Object>> BLUEPRINT_CONFIG = new MapConfigKey<Object>(Object.class, "blueprintConfig"); + Effector<String> DEPLOY_BLUEPRINT = Effectors.effector(String.class, "deployBlueprint") + .description("Deploy a blueprint, either given a plan (as Java map or JSON string for a map), or given URL and optional config") + .parameter(BLUEPRINT_TYPE) + .parameter(BLUEPRINT_CONFIG) + .parameter(BLUEPRINT_CAMP_PLAN) + .buildAbstract(); + } + + public static final Effector<String> DEPLOY_BLUEPRINT = DeployBlueprintEffector.DEPLOY_BLUEPRINT; + + public interface ShutdownEffector { + ConfigKey<Boolean> STOP_APPS_FIRST = ConfigKeys.newBooleanConfigKey("stopAppsFirst", "Whether to stop apps before shutting down"); + ConfigKey<Boolean> FORCE_SHUTDOWN_ON_ERROR = ConfigKeys.newBooleanConfigKey("forceShutdownOnError", "Force shutdown if apps fail to stop or timeout"); + ConfigKey<Duration> SHUTDOWN_TIMEOUT = ConfigKeys.newConfigKey(Duration.class, "shutdownTimeout", "A maximum delay to wait for apps to gracefully stop before giving up or forcibly exiting"); + ConfigKey<Duration> REQUEST_TIMEOUT = ConfigKeys.newConfigKey(Duration.class, "requestTimeout", "Maximum time to block the request for the shutdown to finish, 0 to wait infinitely"); + ConfigKey<Duration> DELAY_FOR_HTTP_RETURN = ConfigKeys.newConfigKey(Duration.class, "delayForHttpReturn", "The delay before exiting the process, to permit the REST response to be returned"); + Effector<Void> SHUTDOWN = Effectors.effector(Void.class, "shutdown") + .description("Shutdown the remote brooklyn instance (stops via the REST API only; leaves any VM)") + .parameter(STOP_APPS_FIRST) + .parameter(FORCE_SHUTDOWN_ON_ERROR) + .parameter(SHUTDOWN_TIMEOUT) + .parameter(REQUEST_TIMEOUT) + .parameter(DELAY_FOR_HTTP_RETURN) + .buildAbstract(); + } + + public static final Effector<Void> SHUTDOWN = ShutdownEffector.SHUTDOWN; + + public interface StopNodeButLeaveAppsEffector { + ConfigKey<Duration> TIMEOUT = ConfigKeys.newConfigKey(Duration.class, "timeout", "How long to wait before giving up on stopping the node", Duration.ONE_HOUR); + Effector<Void> STOP_NODE_BUT_LEAVE_APPS = Effectors.effector(Void.class, "stopNodeButLeaveApps") + .description("Stop the Brooklyn process, and any VM created, and unmanage this entity; but if it was managing other applications, leave them running") + .parameter(TIMEOUT) + .buildAbstract(); + } + + public static final Effector<Void> STOP_NODE_BUT_LEAVE_APPS = StopNodeButLeaveAppsEffector.STOP_NODE_BUT_LEAVE_APPS; + + public interface StopNodeAndKillAppsEffector { + ConfigKey<Duration> TIMEOUT = ConfigKeys.newConfigKey(Duration.class, "timeout", "How long to wait before giving up on stopping the node", Duration.ONE_HOUR); + Effector<Void> STOP_NODE_AND_KILL_APPS = Effectors.effector(Void.class, "stopNodeAndKillApps") + .description("Stop all apps managed by the Brooklyn process, stop the process, and any VM created, and unmanage this entity") + .parameter(TIMEOUT) + .buildAbstract(); + } + + public static final Effector<Void> STOP_NODE_AND_KILL_APPS = StopNodeAndKillAppsEffector.STOP_NODE_AND_KILL_APPS; + + public interface SetHighAvailabilityPriorityEffector { + ConfigKey<Integer> PRIORITY = ConfigKeys.newIntegerConfigKey("priority", "HA priority"); + Effector<Integer> SET_HIGH_AVAILABILITY_PRIORITY = Effectors.effector(Integer.class, "setHighAvailabilityPriority") + .description("Set the HA priority on the node, returning the old priority") + .parameter(PRIORITY) + .buildAbstract(); + } + + public static final Effector<Integer> SET_HIGH_AVAILABILITY_PRIORITY = SetHighAvailabilityPriorityEffector.SET_HIGH_AVAILABILITY_PRIORITY; + + public interface SetHighAvailabilityModeEffector { + ConfigKey<HighAvailabilityMode> MODE = ConfigKeys.newConfigKey(HighAvailabilityMode.class, "mode", "HA mode"); + Effector<ManagementNodeState> SET_HIGH_AVAILABILITY_MODE = Effectors.effector(ManagementNodeState.class, "setHighAvailabilityMode") + .description("Set the HA mode on the node, returning the existing state") + .parameter(MODE) + .buildAbstract(); + } + + public static final Effector<ManagementNodeState> SET_HIGH_AVAILABILITY_MODE = SetHighAvailabilityModeEffector.SET_HIGH_AVAILABILITY_MODE; + + public EntityHttpClient http(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeDriver.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeDriver.java b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeDriver.java new file mode 100644 index 0000000..e45eab1 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeDriver.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.brooklynnode; + +import org.apache.brooklyn.entity.java.JavaSoftwareProcessDriver; + +public interface BrooklynNodeDriver extends JavaSoftwareProcessDriver { + + void clearInstallDir(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java new file mode 100644 index 0000000..4e76f70 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.brooklynnode; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.mgmt.TaskAdaptable; +import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.render.RendererHints; +import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; +import org.apache.brooklyn.effector.core.EffectorBody; +import org.apache.brooklyn.effector.core.Effectors; +import org.apache.brooklyn.entity.brooklynnode.EntityHttpClient.ResponseCodePredicates; +import org.apache.brooklyn.entity.brooklynnode.effector.BrooklynNodeUpgradeEffectorBody; +import org.apache.brooklyn.entity.brooklynnode.effector.SetHighAvailabilityModeEffectorBody; +import org.apache.brooklyn.entity.brooklynnode.effector.SetHighAvailabilityPriorityEffectorBody; +import org.apache.brooklyn.entity.core.Attributes; +import org.apache.brooklyn.entity.core.Entities; +import org.apache.brooklyn.entity.lifecycle.Lifecycle; +import org.apache.brooklyn.entity.lifecycle.ServiceStateLogic; +import org.apache.brooklyn.entity.lifecycle.ServiceStateLogic.ServiceNotUpLogic; +import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl; +import org.apache.brooklyn.entity.software.base.SoftwareProcess.StopSoftwareParameters.StopMode; +import org.apache.brooklyn.entity.software.base.lifecycle.MachineLifecycleEffectorTasks; +import org.apache.brooklyn.entity.trait.Startable; +import org.apache.http.HttpStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.brooklyn.location.access.BrooklynAccessUtils; +import org.apache.brooklyn.location.basic.Locations; +import org.apache.brooklyn.sensor.enricher.Enrichers; +import org.apache.brooklyn.sensor.feed.ConfigToAttributes; +import org.apache.brooklyn.sensor.feed.http.HttpFeed; +import org.apache.brooklyn.sensor.feed.http.HttpPollConfig; +import org.apache.brooklyn.sensor.feed.http.HttpValueFunctions; +import org.apache.brooklyn.sensor.feed.http.JsonFunctions; +import org.apache.brooklyn.util.collections.Jsonya; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.http.HttpToolResponse; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.TaskTags; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.PropagatedRuntimeException; +import org.apache.brooklyn.util.guava.Functionals; +import org.apache.brooklyn.util.javalang.Enums; +import org.apache.brooklyn.util.javalang.JavaClassNames; +import org.apache.brooklyn.util.repeat.Repeater; +import org.apache.brooklyn.util.text.Strings; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.net.HostAndPort; +import com.google.common.util.concurrent.Runnables; +import com.google.gson.Gson; + +public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNode { + + private static final Logger log = LoggerFactory.getLogger(BrooklynNodeImpl.class); + + static { + RendererHints.register(WEB_CONSOLE_URI, RendererHints.namedActionWithUrl()); + } + + private static class UnmanageTask implements Runnable { + private Task<?> latchTask; + private Entity unmanageEntity; + + public UnmanageTask(@Nullable Task<?> latchTask, Entity unmanageEntity) { + this.latchTask = latchTask; + this.unmanageEntity = unmanageEntity; + } + + public void run() { + if (latchTask != null) { + latchTask.blockUntilEnded(); + } else { + log.debug("No latch task provided for UnmanageTask, falling back to fixed wait"); + Time.sleep(Duration.FIVE_SECONDS); + } + synchronized (this) { + Entities.unmanage(unmanageEntity); + } + } + } + + private HttpFeed httpFeed; + + public BrooklynNodeImpl() { + super(); + } + + public BrooklynNodeImpl(Entity parent) { + super(parent); + } + + @Override + public Class<?> getDriverInterface() { + return BrooklynNodeDriver.class; + } + + @Override + public void init() { + super.init(); + getMutableEntityType().addEffector(DeployBlueprintEffectorBody.DEPLOY_BLUEPRINT); + getMutableEntityType().addEffector(ShutdownEffectorBody.SHUTDOWN); + getMutableEntityType().addEffector(StopNodeButLeaveAppsEffectorBody.STOP_NODE_BUT_LEAVE_APPS); + getMutableEntityType().addEffector(StopNodeAndKillAppsEffectorBody.STOP_NODE_AND_KILL_APPS); + getMutableEntityType().addEffector(SetHighAvailabilityPriorityEffectorBody.SET_HIGH_AVAILABILITY_PRIORITY); + getMutableEntityType().addEffector(SetHighAvailabilityModeEffectorBody.SET_HIGH_AVAILABILITY_MODE); + getMutableEntityType().addEffector(BrooklynNodeUpgradeEffectorBody.UPGRADE); + } + + @Override + protected void preStart() { + ServiceNotUpLogic.clearNotUpIndicator(this, SHUTDOWN.getName()); + } + + @Override + protected void preStopConfirmCustom() { + super.preStopConfirmCustom(); + ConfigBag stopParameters = BrooklynTaskTags.getCurrentEffectorParameters(); + if (Boolean.TRUE.equals(getAttribute(BrooklynNode.WEB_CONSOLE_ACCESSIBLE)) && + stopParameters != null && !stopParameters.containsKey(ShutdownEffector.STOP_APPS_FIRST)) { + Preconditions.checkState(getChildren().isEmpty(), "Can't stop instance with running applications."); + } + } + + @Override + protected void preStop() { + super.preStop(); + if (MachineLifecycleEffectorTasks.canStop(getStopProcessModeParam(), this)) { + shutdownGracefully(); + } + } + + private StopMode getStopProcessModeParam() { + ConfigBag parameters = BrooklynTaskTags.getCurrentEffectorParameters(); + if (parameters != null) { + return parameters.get(StopSoftwareParameters.STOP_PROCESS_MODE); + } else { + return StopSoftwareParameters.STOP_PROCESS_MODE.getDefaultValue(); + } + } + + @Override + protected void preRestart() { + super.preRestart(); + //restart will kill the process, try to shut down before that + shutdownGracefully(); + DynamicTasks.queue("pre-restart", new Runnable() { public void run() { + //set by shutdown - clear it so the entity starts cleanly. Does the indicator bring any value at all? + ServiceNotUpLogic.clearNotUpIndicator(BrooklynNodeImpl.this, SHUTDOWN.getName()); + }}); + } + + private void shutdownGracefully() { + // Shutdown only if accessible: any of stop_* could have already been called. + // Don't check serviceUp=true because stop() will already have set serviceUp=false && expectedState=stopping + if (Boolean.TRUE.equals(getAttribute(BrooklynNode.WEB_CONSOLE_ACCESSIBLE))) { + queueShutdownTask(); + queueWaitExitTask(); + } else { + log.info("Skipping graceful shutdown call, because web-console not up for {}", this); + } + } + + private void queueWaitExitTask() { + //give time to the process to die gracefully after closing the shutdown call + DynamicTasks.queue(Tasks.builder().name("wait for graceful stop").body(new Runnable() { + @Override + public void run() { + DynamicTasks.markInessential(); + boolean cleanExit = Repeater.create() + .until(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return !getDriver().isRunning(); + } + }) + .backoffTo(Duration.ONE_SECOND) + .limitTimeTo(Duration.ONE_MINUTE) + .run(); + if (!cleanExit) { + log.warn("Tenant " + this + " didn't stop cleanly after shutdown. Timeout waiting for process exit."); + } + } + }).build()); + } + + @Override + protected void postStop() { + super.postStop(); + if (isMachineStopped()) { + // Don't unmanage in entity's task context as it will self-cancel the task. Wait for the stop effector to complete (and all parent entity tasks). + // If this is not enough (still getting Caused by: java.util.concurrent.CancellationException: null) then + // we could wait for BrooklynTaskTags.getTasksInEntityContext(ExecutionManager, this).isEmpty(); + Task<?> stopEffectorTask = BrooklynTaskTags.getClosestEffectorTask(Tasks.current(), Startable.STOP); + Task<?> topEntityTask = getTopEntityTask(stopEffectorTask); + getManagementContext().getExecutionManager().submit(new UnmanageTask(topEntityTask, this)); + } + } + + private Task<?> getTopEntityTask(Task<?> stopEffectorTask) { + Entity context = BrooklynTaskTags.getContextEntity(stopEffectorTask); + Task<?> topTask = stopEffectorTask; + while (true) { + Task<?> parentTask = topTask.getSubmittedByTask(); + Entity parentContext = BrooklynTaskTags.getContextEntity(parentTask); + if (parentTask == null || parentContext != context) { + return topTask; + } else { + topTask = parentTask; + } + } + } + + private boolean isMachineStopped() { + // Don't rely on effector parameters, check if there is still a machine running. + // If the entity was previously stopped with STOP_MACHINE_MODE=StopMode.NEVER + // and a second time with STOP_MACHINE_MODE=StopMode.IF_NOT_STOPPED, then the + // machine is still running, but there is no deterministic way to infer this from + // the parameters alone. + return Locations.findUniqueSshMachineLocation(this.getLocations()).isAbsent(); + } + + private void queueShutdownTask() { + ConfigBag stopParameters = BrooklynTaskTags.getCurrentEffectorParameters(); + ConfigBag shutdownParameters; + if (stopParameters != null) { + shutdownParameters = ConfigBag.newInstanceCopying(stopParameters); + } else { + shutdownParameters = ConfigBag.newInstance(); + } + shutdownParameters.putIfAbsent(ShutdownEffector.REQUEST_TIMEOUT, Duration.ONE_MINUTE); + shutdownParameters.putIfAbsent(ShutdownEffector.FORCE_SHUTDOWN_ON_ERROR, Boolean.TRUE); + TaskAdaptable<Void> shutdownTask = Effectors.invocation(this, SHUTDOWN, shutdownParameters); + //Mark inessential so that even if it fails the process stop task will run afterwards to clean up. + TaskTags.markInessential(shutdownTask); + DynamicTasks.queue(shutdownTask); + } + + public static class DeployBlueprintEffectorBody extends EffectorBody<String> implements DeployBlueprintEffector { + public static final Effector<String> DEPLOY_BLUEPRINT = Effectors.effector(BrooklynNode.DEPLOY_BLUEPRINT).impl(new DeployBlueprintEffectorBody()).build(); + + // TODO support YAML parsing + // TODO define a new type YamlMap for the config key which supports coercing from string and from map + @SuppressWarnings("unchecked") + public static Map<String,Object> asMap(ConfigBag parameters, ConfigKey<?> key) { + Object v = parameters.getStringKey(key.getName()); + if (v==null || (v instanceof String && Strings.isBlank((String)v))) + return null; + if (v instanceof Map) + return (Map<String, Object>) v; + + if (v instanceof String) { + // TODO ideally, parse YAML + return new Gson().fromJson((String)v, Map.class); + } + throw new IllegalArgumentException("Invalid "+JavaClassNames.simpleClassName(v)+" value for "+key+": "+v); + } + + @Override + public String call(ConfigBag parameters) { + if (log.isDebugEnabled()) + log.debug("Deploying blueprint to "+entity()+": "+parameters); + String plan = extractPlanYamlString(parameters); + return submitPlan(plan); + } + + protected String extractPlanYamlString(ConfigBag parameters) { + Object planRaw = parameters.getStringKey(BLUEPRINT_CAMP_PLAN.getName()); + if (planRaw instanceof String && Strings.isBlank((String)planRaw)) planRaw = null; + + String url = parameters.get(BLUEPRINT_TYPE); + if (url!=null && planRaw!=null) + throw new IllegalArgumentException("Cannot supply both plan and url"); + if (url==null && planRaw==null) + throw new IllegalArgumentException("Must supply plan or url"); + + Map<String, Object> config = asMap(parameters, BLUEPRINT_CONFIG); + + if (planRaw==null) { + planRaw = Jsonya.at("services").list().put("serviceType", url).putIfNotNull("brooklyn.config", config).getRootMap(); + } else { + if (config!=null) + throw new IllegalArgumentException("Cannot supply plan with config"); + } + + // planRaw might be a yaml string, or a map; if a map, convert to string + if (planRaw instanceof Map) + planRaw = Jsonya.of((Map<?,?>)planRaw).toString(); + if (!(planRaw instanceof String)) + throw new IllegalArgumentException("Invalid "+JavaClassNames.simpleClassName(planRaw)+" value for CAMP plan: "+planRaw); + + // now *all* the data is in planRaw; that is what will be submitted + return (String)planRaw; + } + + @VisibleForTesting + // Integration test for this in BrooklynNodeIntegrationTest in this project doesn't use this method, + // but a Unit test for this does, in DeployBlueprintTest -- but in the REST server project (since it runs against local) + public String submitPlan(final String plan) { + final MutableMap<String, String> headers = MutableMap.of(com.google.common.net.HttpHeaders.CONTENT_TYPE, "application/yaml"); + final AtomicReference<byte[]> response = new AtomicReference<byte[]>(); + Repeater.create() + .every(Duration.ONE_SECOND) + .backoffTo(Duration.FIVE_SECONDS) + .limitTimeTo(Duration.minutes(5)) + .repeat(Runnables.doNothing()) + .rethrowExceptionImmediately() + .until(new Callable<Boolean>() { + @Override + public Boolean call() { + HttpToolResponse result = ((BrooklynNode)entity()).http() + //will throw on non-{2xx, 403} response + .responseSuccess(Predicates.<Integer>or(ResponseCodePredicates.success(), Predicates.equalTo(HttpStatus.SC_FORBIDDEN))) + .post("/v1/applications", headers, plan.getBytes()); + if (result.getResponseCode() == HttpStatus.SC_FORBIDDEN) { + log.debug("Remote is not ready to accept requests, response is " + result.getResponseCode()); + return false; + } else { + byte[] content = result.getContent(); + response.set(content); + return true; + } + } + }) + .runRequiringTrue(); + return (String)new Gson().fromJson(new String(response.get()), Map.class).get("entityId"); + } + } + + public static class ShutdownEffectorBody extends EffectorBody<Void> implements ShutdownEffector { + public static final Effector<Void> SHUTDOWN = Effectors.effector(BrooklynNode.SHUTDOWN).impl(new ShutdownEffectorBody()).build(); + + @Override + public Void call(ConfigBag parameters) { + MutableMap<String, String> formParams = MutableMap.of(); + Lifecycle initialState = entity().getAttribute(Attributes.SERVICE_STATE_ACTUAL); + ServiceStateLogic.setExpectedState(entity(), Lifecycle.STOPPING); + for (ConfigKey<?> k: new ConfigKey<?>[] { STOP_APPS_FIRST, FORCE_SHUTDOWN_ON_ERROR, SHUTDOWN_TIMEOUT, REQUEST_TIMEOUT, DELAY_FOR_HTTP_RETURN }) + formParams.addIfNotNull(k.getName(), toNullableString(parameters.get(k))); + try { + log.debug("Shutting down "+entity()+" with "+formParams); + HttpToolResponse resp = ((BrooklynNode)entity()).http() + .post("/v1/server/shutdown", + ImmutableMap.of("Brooklyn-Allow-Non-Master-Access", "true"), + formParams); + if (resp.getResponseCode() != HttpStatus.SC_NO_CONTENT) { + throw new IllegalStateException("Response code "+resp.getResponseCode()); + } + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + throw new PropagatedRuntimeException("Error shutting down remote node "+entity()+" (in state "+initialState+"): "+Exceptions.collapseText(e), e); + } + ServiceNotUpLogic.updateNotUpIndicator(entity(), SHUTDOWN.getName(), "Shutdown of remote node has completed successfuly"); + return null; + } + + private static String toNullableString(Object obj) { + if (obj == null) { + return null; + } else { + return obj.toString(); + } + } + + } + + public static class StopNodeButLeaveAppsEffectorBody extends EffectorBody<Void> implements StopNodeButLeaveAppsEffector { + public static final Effector<Void> STOP_NODE_BUT_LEAVE_APPS = Effectors.effector(BrooklynNode.STOP_NODE_BUT_LEAVE_APPS).impl(new StopNodeButLeaveAppsEffectorBody()).build(); + + @Override + public Void call(ConfigBag parameters) { + Duration timeout = parameters.get(TIMEOUT); + + ConfigBag stopParameters = ConfigBag.newInstanceCopying(parameters); + stopParameters.put(ShutdownEffector.STOP_APPS_FIRST, Boolean.FALSE); + stopParameters.putIfAbsent(ShutdownEffector.SHUTDOWN_TIMEOUT, timeout); + stopParameters.putIfAbsent(ShutdownEffector.REQUEST_TIMEOUT, timeout); + DynamicTasks.queue(Effectors.invocation(entity(), STOP, stopParameters)).asTask().getUnchecked(); + return null; + } + } + + public static class StopNodeAndKillAppsEffectorBody extends EffectorBody<Void> implements StopNodeAndKillAppsEffector { + public static final Effector<Void> STOP_NODE_AND_KILL_APPS = Effectors.effector(BrooklynNode.STOP_NODE_AND_KILL_APPS).impl(new StopNodeAndKillAppsEffectorBody()).build(); + + @Override + public Void call(ConfigBag parameters) { + Duration timeout = parameters.get(TIMEOUT); + + ConfigBag stopParameters = ConfigBag.newInstanceCopying(parameters); + stopParameters.put(ShutdownEffector.STOP_APPS_FIRST, Boolean.TRUE); + stopParameters.putIfAbsent(ShutdownEffector.SHUTDOWN_TIMEOUT, timeout); + stopParameters.putIfAbsent(ShutdownEffector.REQUEST_TIMEOUT, timeout); + DynamicTasks.queue(Effectors.invocation(entity(), STOP, stopParameters)).asTask().getUnchecked(); + return null; + } + } + + public List<String> getClasspath() { + List<String> classpath = getConfig(CLASSPATH); + if (classpath == null || classpath.isEmpty()) { + classpath = getManagementContext().getConfig().getConfig(CLASSPATH); + } + return classpath; + } + + protected List<String> getEnabledHttpProtocols() { + return getAttribute(ENABLED_HTTP_PROTOCOLS); + } + + protected boolean isHttpProtocolEnabled(String protocol) { + List<String> protocols = getAttribute(ENABLED_HTTP_PROTOCOLS); + for (String contender : protocols) { + if (protocol.equalsIgnoreCase(contender)) { + return true; + } + } + return false; + } + + @Override + protected void connectSensors() { + super.connectSensors(); + + // TODO what sensors should we poll? + ConfigToAttributes.apply(this); + + URI webConsoleUri; + if (isHttpProtocolEnabled("http")) { + int port = getConfig(PORT_MAPPER).apply(getAttribute(HTTP_PORT)); + HostAndPort accessible = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, port); + webConsoleUri = URI.create(String.format("http://%s:%s", accessible.getHostText(), accessible.getPort())); + } else if (isHttpProtocolEnabled("https")) { + int port = getConfig(PORT_MAPPER).apply(getAttribute(HTTPS_PORT)); + HostAndPort accessible = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, port); + webConsoleUri = URI.create(String.format("https://%s:%s", accessible.getHostText(), accessible.getPort())); + } else { + // web-console is not enabled + webConsoleUri = null; + } + setAttribute(WEB_CONSOLE_URI, webConsoleUri); + + if (webConsoleUri != null) { + httpFeed = HttpFeed.builder() + .entity(this) + .period(getConfig(POLL_PERIOD)) + .baseUri(webConsoleUri) + .credentialsIfNotNull(getConfig(MANAGEMENT_USER), getConfig(MANAGEMENT_PASSWORD)) + .poll(new HttpPollConfig<Boolean>(WEB_CONSOLE_ACCESSIBLE) + .suburl("/v1/server/healthy") + .onSuccess(Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.cast(Boolean.class))) + //if using an old distribution the path doesn't exist, but at least the instance is responding + .onFailure(HttpValueFunctions.responseCodeEquals(404)) + .setOnException(false)) + .poll(new HttpPollConfig<ManagementNodeState>(MANAGEMENT_NODE_STATE) + .suburl("/v1/server/ha/state") + .onSuccess(Functionals.chain(Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.cast(String.class)), Enums.fromStringFunction(ManagementNodeState.class))) + .setOnFailureOrException(null)) + // TODO sensors for load, size, etc + .build(); + + if (!Lifecycle.RUNNING.equals(getAttribute(SERVICE_STATE_ACTUAL))) { + // TODO when updating the map, if it would change from empty to empty on a successful run (see in nginx) + ServiceNotUpLogic.updateNotUpIndicator(this, WEB_CONSOLE_ACCESSIBLE, "No response from the web console yet"); + } + addEnricher(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS) + .from(WEB_CONSOLE_ACCESSIBLE) + .computing(Functionals.ifNotEquals(true).value("URL where Brooklyn listens is not answering correctly") ) + .build()); + } else { + connectServiceUpIsRunning(); + } + } + + @Override + protected void disconnectSensors() { + super.disconnectSensors(); + disconnectServiceUpIsRunning(); + if (httpFeed != null) httpFeed.stop(); + } + + @Override + public EntityHttpClient http() { + return new EntityHttpClientImpl(this, BrooklynNode.WEB_CONSOLE_URI); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeSshDriver.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeSshDriver.java b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeSshDriver.java new file mode 100644 index 0000000..3ba710a --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeSshDriver.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.brooklynnode; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.lang.String.format; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.InputStream; +import java.net.InetAddress; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.brooklyn.entity.brooklynnode.BrooklynNode.ExistingFileBehaviour; +import org.apache.brooklyn.entity.core.Entities; +import org.apache.brooklyn.entity.drivers.downloads.DownloadSubstituters; +import org.apache.brooklyn.entity.java.JavaSoftwareProcessSshDriver; +import org.apache.brooklyn.location.basic.SshMachineLocation; +import org.apache.brooklyn.sensor.ssh.SshEffectorTasks; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.file.ArchiveBuilder; +import org.apache.brooklyn.util.core.file.ArchiveUtils; +import org.apache.brooklyn.util.core.internal.ssh.SshTool; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.net.Networking; +import org.apache.brooklyn.util.net.Urls; +import org.apache.brooklyn.util.os.Os; +import org.apache.brooklyn.util.ssh.BashCommands; +import org.apache.brooklyn.util.text.Identifiers; +import org.apache.brooklyn.util.text.Strings; + +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +public class BrooklynNodeSshDriver extends JavaSoftwareProcessSshDriver implements BrooklynNodeDriver { + + public BrooklynNodeSshDriver(BrooklynNodeImpl entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + public BrooklynNodeImpl getEntity() { + return (BrooklynNodeImpl) super.getEntity(); + } + + public String getBrooklynHome() { + return getRunDir(); + } + + @Override + protected String getLogFileLocation() { + return Os.mergePathsUnix(getRunDir(), "console"); + } + + protected String getPidFile() { + return Os.mergePathsUnix(getRunDir(), "pid_java"); + } + + @Override + protected String getInstallLabelExtraSalt() { + String downloadUrl = entity.getConfig(BrooklynNode.DOWNLOAD_URL); + String uploadUrl = entity.getConfig(BrooklynNode.DISTRO_UPLOAD_URL); + if (Objects.equal(downloadUrl, BrooklynNode.DOWNLOAD_URL.getConfigKey().getDefaultValue()) && + Objects.equal(uploadUrl, BrooklynNode.DISTRO_UPLOAD_URL.getDefaultValue())) { + // if both are at the default value, then no salt + return null; + } + return Identifiers.makeIdFromHash(Objects.hashCode(downloadUrl, uploadUrl)); + } + + @Override + public void preInstall() { + resolver = Entities.newDownloader(this); + String subpath = entity.getConfig(BrooklynNode.SUBPATH_IN_ARCHIVE); + if (subpath==null) { + // assume the dir name is `basename-VERSION` where download link is `basename-VERSION-dist.tar.gz` + String uploadUrl = entity.getConfig(BrooklynNode.DISTRO_UPLOAD_URL); + String origDownloadName = uploadUrl; + if (origDownloadName==null) + origDownloadName = entity.getAttribute(BrooklynNode.DOWNLOAD_URL); + if (origDownloadName!=null) { + // BasicDownloadResolver makes it crazy hard to get the template-evaluated value of DOWNLOAD_URL + origDownloadName = DownloadSubstituters.substitute(origDownloadName, DownloadSubstituters.getBasicEntitySubstitutions(this)); + origDownloadName = Urls.decode(origDownloadName); + origDownloadName = Urls.getBasename(origDownloadName); + String downloadName = origDownloadName; + downloadName = Strings.removeFromEnd(downloadName, ".tar.gz"); + downloadName = Strings.removeFromEnd(downloadName, ".tgz"); + downloadName = Strings.removeFromEnd(downloadName, ".zip"); + if (!downloadName.equals(origDownloadName)) { + downloadName = Strings.removeFromEnd(downloadName, "-dist"); + subpath = downloadName; + } + } + } + if (subpath==null) subpath = format("brooklyn-dist-%s", getVersion()); + setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(subpath))); + } + + @Override + public void clearInstallDir() { + super.setInstallDir(null); + super.setExpandedInstallDir(null); + } + + @Override + public void install() { + String uploadUrl = entity.getConfig(BrooklynNode.DISTRO_UPLOAD_URL); + + // Need to explicitly give file, because for snapshot URLs you don't get a clean filename from the URL. + // This filename is used to generate the first URL to try: [BROOKLYN_VERSION_BELOW] + // file://$HOME/.brooklyn/repository/BrooklynNode/0.8.0-SNAPSHOT/brooklynnode-0.8.0-snapshot.tar.gz + // (DOWNLOAD_URL overrides this and has a default which comes from maven) + List<String> urls = resolver.getTargets(); + String saveAs = resolver.getFilename(); + + newScript("createInstallDir") + .body.append("mkdir -p "+getInstallDir()) + .failOnNonZeroResultCode() + .execute(); + + List<String> commands = Lists.newArrayList(); + // TODO use machine.installTo ... but that only works w a single location currently + if (uploadUrl != null) { + // Only upload if not already installed + boolean exists = newScript("checkIfInstalled") + .body.append("cd "+getInstallDir(), "test -f BROOKLYN") + .execute() == 0; + if (!exists) { + InputStream distroStream = resource.getResourceFromUrl(uploadUrl); + getMachine().copyTo(distroStream, getInstallDir()+"/"+saveAs); + } + } else { + commands.addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)); + } + commands.add(BashCommands.INSTALL_TAR); + commands.add("tar xzfv " + saveAs); + + newScript(INSTALLING). + failOnNonZeroResultCode(). + body.append(commands).execute(); + } + + @Override + public void customize() { + newScript(CUSTOMIZING) + .failOnNonZeroResultCode() + .body.append( + // workaround for AMP distribution placing everything in the root of this archive, but + // brooklyn distribution placing everything in a subdirectory: check to see if subdirectory + // with expected name exists; symlink to same directory if it doesn't + // FIXME remove when all downstream usages don't use this + format("[ -d %1$s ] || ln -s . %1$s", getExpandedInstallDir(), getExpandedInstallDir()), + + // previously we only copied bin,conf and set BROOKLYN_HOME to the install dir; + // but that does not play nicely if installing dists other than brooklyn + // (such as what is built by our artifact) + format("cp -R %s/* .", getExpandedInstallDir()), + "mkdir -p ./lib/") + .execute(); + + SshMachineLocation machine = getMachine(); + BrooklynNode entity = getEntity(); + + String brooklynGlobalPropertiesRemotePath = entity.getConfig(BrooklynNode.BROOKLYN_GLOBAL_PROPERTIES_REMOTE_PATH); + String brooklynGlobalPropertiesContents = entity.getConfig(BrooklynNode.BROOKLYN_GLOBAL_PROPERTIES_CONTENTS); + String brooklynGlobalPropertiesUri = entity.getConfig(BrooklynNode.BROOKLYN_GLOBAL_PROPERTIES_URI); + + String brooklynLocalPropertiesRemotePath = processTemplateContents(entity.getConfig(BrooklynNode.BROOKLYN_LOCAL_PROPERTIES_REMOTE_PATH)); + String brooklynLocalPropertiesContents = entity.getConfig(BrooklynNode.BROOKLYN_LOCAL_PROPERTIES_CONTENTS); + String brooklynLocalPropertiesUri = entity.getConfig(BrooklynNode.BROOKLYN_LOCAL_PROPERTIES_URI); + + String brooklynCatalogRemotePath = entity.getConfig(BrooklynNode.BROOKLYN_CATALOG_REMOTE_PATH); + String brooklynCatalogContents = entity.getConfig(BrooklynNode.BROOKLYN_CATALOG_CONTENTS); + String brooklynCatalogUri = entity.getConfig(BrooklynNode.BROOKLYN_CATALOG_URI); + + // Override the ~/.brooklyn/brooklyn.properties if required + if (brooklynGlobalPropertiesContents != null || brooklynGlobalPropertiesUri != null) { + ExistingFileBehaviour onExisting = entity.getConfig(BrooklynNode.ON_EXISTING_PROPERTIES_FILE); + Integer checkExists = DynamicTasks.queue(SshEffectorTasks.ssh("ls \""+brooklynGlobalPropertiesRemotePath+"\"").allowingNonZeroExitCode()).get(); + boolean doUpload = true; + if (checkExists==0) { + switch (onExisting) { + case USE_EXISTING: doUpload = false; break; + case OVERWRITE: break; + case DO_NOT_USE: + throw new IllegalStateException("Properties file "+brooklynGlobalPropertiesContents+" already exists and "+ + "even though it is not being used, content for it was supplied"); + case FAIL: + throw new IllegalStateException("Properties file "+brooklynGlobalPropertiesContents+" already exists and "+ + BrooklynNode.ON_EXISTING_PROPERTIES_FILE+" response is to fail"); + default: + throw new IllegalStateException("Properties file "+brooklynGlobalPropertiesContents+" already exists and "+ + BrooklynNode.ON_EXISTING_PROPERTIES_FILE+" response "+onExisting+" is unknown"); + } + } + if (onExisting==ExistingFileBehaviour.DO_NOT_USE) { + log.warn("Global properties supplied when told not to use them; no global properties exists, so it will be installed, but it will not be used."); + } + if (doUpload) + uploadFileContents(brooklynGlobalPropertiesContents, brooklynGlobalPropertiesUri, brooklynGlobalPropertiesRemotePath); + } + + // Upload a local-brooklyn.properties if required + if (brooklynLocalPropertiesContents != null || brooklynLocalPropertiesUri != null) { + uploadFileContents(brooklynLocalPropertiesContents, brooklynLocalPropertiesUri, brooklynLocalPropertiesRemotePath); + } + + // Override the ~/.brooklyn/catalog.xml if required + if (brooklynCatalogContents != null || brooklynCatalogUri != null) { + uploadFileContents(brooklynCatalogContents, brooklynCatalogUri, brooklynCatalogRemotePath); + } + + // Copy additional resources to the server + for (Map.Entry<String,String> entry : getEntity().getAttribute(BrooklynNode.COPY_TO_RUNDIR).entrySet()) { + Map<String, String> substitutions = ImmutableMap.of("RUN", getRunDir()); + String localResource = entry.getKey(); + String remotePath = entry.getValue(); + String resolvedRemotePath = remotePath; + for (Map.Entry<String,String> substitution : substitutions.entrySet()) { + String key = substitution.getKey(); + String val = substitution.getValue(); + resolvedRemotePath = resolvedRemotePath.replace("${"+key+"}", val).replace("$"+key, val); + } + machine.copyTo(MutableMap.of("permissions", "0600"), resource.getResourceFromUrl(localResource), resolvedRemotePath); + } + + for (String entry : getEntity().getClasspath()) { + // If a local folder, then create archive from contents first + if (Urls.isDirectory(entry)) { + File jarFile = ArchiveBuilder.jar().addDirContentsAt(new File(entry), "").create(); + entry = jarFile.getAbsolutePath(); + } + + // Determine filename + String destFile = entry.contains("?") ? entry.substring(0, entry.indexOf('?')) : entry; + destFile = destFile.substring(destFile.lastIndexOf('/') + 1); + + ArchiveUtils.deploy(MutableMap.<String, Object>of(), entry, machine, getRunDir(), Os.mergePaths(getRunDir(), "lib"), destFile); + } + + String cmd = entity.getConfig(BrooklynNode.EXTRA_CUSTOMIZATION_SCRIPT); + if (Strings.isNonBlank(cmd)) { + DynamicTasks.queueIfPossible( SshEffectorTasks.ssh(cmd).summary("Bespoke BrooklynNode customization script") + .requiringExitCodeZero() ) + .orSubmitAndBlock(getEntity()); + } + } + + @SuppressWarnings("deprecation") + @Override + public void launch() { + String app = getEntity().getAttribute(BrooklynNode.APP); + String locations = getEntity().getAttribute(BrooklynNode.LOCATIONS); + boolean hasLocalBrooklynProperties = getEntity().getConfig(BrooklynNode.BROOKLYN_LOCAL_PROPERTIES_CONTENTS) != null || getEntity().getConfig(BrooklynNode.BROOKLYN_LOCAL_PROPERTIES_URI) != null; + String localBrooklynPropertiesPath = processTemplateContents(getEntity().getConfig(BrooklynNode.BROOKLYN_LOCAL_PROPERTIES_REMOTE_PATH)); + InetAddress bindAddress = getEntity().getAttribute(BrooklynNode.WEB_CONSOLE_BIND_ADDRESS); + InetAddress publicAddress = getEntity().getAttribute(BrooklynNode.WEB_CONSOLE_PUBLIC_ADDRESS); + + String cmd = entity.getConfig(BrooklynNode.LAUNCH_COMMAND); + if (Strings.isBlank(cmd)) cmd = "./bin/brooklyn"; + cmd = "nohup " + cmd + " launch"; + if (app != null) { + cmd += " --app "+app; + } + if (locations != null) { + cmd += " --locations "+locations; + } + if (entity.getConfig(BrooklynNode.ON_EXISTING_PROPERTIES_FILE)==ExistingFileBehaviour.DO_NOT_USE) { + cmd += " --noGlobalBrooklynProperties"; + } + if (hasLocalBrooklynProperties) { + cmd += " --localBrooklynProperties "+localBrooklynPropertiesPath; + } + Integer webPort = null; + if (getEntity().isHttpProtocolEnabled("http")) { + webPort = getEntity().getAttribute(BrooklynNode.HTTP_PORT); + Networking.checkPortsValid(ImmutableMap.of("webPort", webPort)); + } else if (getEntity().isHttpProtocolEnabled("https")) { + webPort = getEntity().getAttribute(BrooklynNode.HTTPS_PORT); + Networking.checkPortsValid(ImmutableMap.of("webPort", webPort)); + } + if (webPort!=null) { + cmd += " --port "+webPort; + } else if (getEntity().getEnabledHttpProtocols().isEmpty()) { + // TODO sensors will probably not work in this mode + cmd += " --noConsole"; + } else { + throw new IllegalStateException("Unknown web protocol in "+BrooklynNode.ENABLED_HTTP_PROTOCOLS+" " + + "("+getEntity().getEnabledHttpProtocols()+"); expecting 'http' or 'https'"); + } + + if (bindAddress != null) { + cmd += " --bindAddress "+bindAddress.getHostAddress(); + } + if (publicAddress != null) { + cmd += " --publicAddress "+publicAddress.getHostName(); + } + if (getEntity().getAttribute(BrooklynNode.NO_WEB_CONSOLE_AUTHENTICATION)) { + cmd += " --noConsoleSecurity"; + } + if (Strings.isNonBlank(getEntity().getConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS))) { + cmd += " "+getEntity().getConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS); + } + cmd += format(" >> %s/console 2>&1 </dev/null &", getRunDir()); + + log.info("Starting brooklyn on {} using command {}", getMachine(), cmd); + + // relies on brooklyn script creating pid file + newScript(ImmutableMap.of("usePidFile", + entity.getConfig(BrooklynNode.LAUNCH_COMMAND_CREATES_PID_FILE) ? false : getPidFile()), + LAUNCHING). + body.append( + format("export BROOKLYN_CLASSPATH=%s", getRunDir()+"/lib/\"*\""), + format("export BROOKLYN_HOME=%s", getBrooklynHome()), + format(cmd) + ).failOnNonZeroResultCode().execute(); + } + + @Override + public boolean isRunning() { + Map<String,String> flags = ImmutableMap.of("usePidFile", getPidFile()); + int result = newScript(flags, CHECK_RUNNING).execute(); + return result == 0; + } + + @Override + public void stop() { + Map<String,String> flags = ImmutableMap.of("usePidFile", getPidFile()); + newScript(flags, STOPPING).execute(); + } + + @Override + public void kill() { + Map<String,String> flags = ImmutableMap.of("usePidFile", getPidFile()); + newScript(flags, KILLING).execute(); + } + + @Override + public Map<String, String> getShellEnvironment() { + Map<String, String> orig = super.getShellEnvironment(); + String origClasspath = orig.get("CLASSPATH"); + String newClasspath = (origClasspath == null ? "" : origClasspath+":") + + getRunDir()+"/conf/" + ":" + + getRunDir()+"/lib/\"*\""; + Map<String,String> results = new LinkedHashMap<String,String>(); + results.putAll(orig); + results.put("BROOKLYN_CLASSPATH", newClasspath); + results.put("BROOKLYN_HOME", getBrooklynHome()); + results.put("RUN", getRunDir()); + return results; + } + + private void uploadFileContents(String contents, String alternativeUri, String remotePath) { + checkNotNull(remotePath, "remotePath"); + SshMachineLocation machine = getMachine(); + String tempRemotePath = String.format("%s/upload.tmp", getRunDir()); + + if (contents == null && alternativeUri == null) { + throw new IllegalStateException("No contents supplied for file " + remotePath); + } + InputStream stream = contents != null + ? new ByteArrayInputStream(contents.getBytes()) + : resource.getResourceFromUrl(alternativeUri); + Map<String, String> flags = MutableMap.of(SshTool.PROP_PERMISSIONS.getName(), "0600"); + machine.copyTo(flags, stream, tempRemotePath); + newScript(CUSTOMIZING) + .failOnNonZeroResultCode() + .body.append( + format("mkdir -p %s", remotePath.subSequence(0, remotePath.lastIndexOf("/"))), + format("cp -p %s %s", tempRemotePath, remotePath), + format("rm -f %s", tempRemotePath)) + .execute(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/EntityHttpClient.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/EntityHttpClient.java b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/EntityHttpClient.java new file mode 100644 index 0000000..dd2db70 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/EntityHttpClient.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.brooklynnode; + +import java.util.Map; + +import org.apache.brooklyn.util.core.http.HttpTool; +import org.apache.brooklyn.util.core.http.HttpToolResponse; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.Range; + +/** + * Helpful methods for making HTTP requests to {@link BrooklynNode} entities. + */ +public interface EntityHttpClient { + public static class ResponseCodePredicates { + private static class ResponseCodeHealthyPredicate implements Predicate<Integer> { + @Override + public boolean apply(Integer input) { + return HttpTool.isStatusCodeHealthy(input); + } + } + public static Predicate<Integer> informational() {return Range.closed(100, 199);} + public static Predicate<Integer> success() {return new ResponseCodeHealthyPredicate();} + public static Predicate<Integer> redirect() {return Range.closed(300, 399);} + public static Predicate<Integer> clientError() {return Range.closed(400, 499);} + public static Predicate<Integer> serverError() {return Range.closed(500, 599);} + public static Predicate<Integer> invalid() {return Predicates.or(Range.atMost(99), Range.atLeast(600));} + } + + /** + * @return An HTTP client builder configured to access the {@link + * BrooklynNode#WEB_CONSOLE_URI web console URI} at the + * given entity, or null if the entity has no URI. + */ + public HttpTool.HttpClientBuilder getHttpClientForBrooklynNode(); + + /** + * Configure which response codes are treated as successful + * @param successPredicate A predicate which returns true is the response code is acceptable + * @return this + */ + public EntityHttpClient responseSuccess(Predicate<Integer> responseSuccess); + + /** + * Makes an HTTP GET to a Brooklyn node entity. + * @param path Relative path to resource on server, e.g v1/catalog + * @return The server's response + */ + public HttpToolResponse get(String path); + + /** + * Makes an HTTP POST to a Brooklyn node entity. + * @param path Relative path to resource on server, e.g v1/catalog + * @param body byte array of serialized JSON to attach to the request + * @return The server's response + */ + public HttpToolResponse post(String path, Map<String, String> headers, byte[] body); + + /** + * Makes an HTTP POST to a Brooklyn node entity. + * @param path Relative path to resource on server, e.g v1/catalog + * @param formParams The parameters to send in a x-www-form-urlencoded format + * @return The server's response + */ + public HttpToolResponse post(String path, Map<String, String> headers, Map<String, String> formParams); + + /** + * Makes an HTTP DELETE to a Brooklyn node entity. + * @param path Relative path to resource on server, e.g v1/catalog + * @return The server's response + */ + public HttpToolResponse delete(String path, Map<String, String> headers); + +}
