http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/software/winrm/WindowsPerformanceCounterSensors.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/software/winrm/WindowsPerformanceCounterSensors.java b/software/base/src/main/java/brooklyn/entity/software/winrm/WindowsPerformanceCounterSensors.java deleted file mode 100644 index 05a0482..0000000 --- a/software/base/src/main/java/brooklyn/entity/software/winrm/WindowsPerformanceCounterSensors.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.software.winrm; - -import java.util.Map; -import java.util.Set; - -import org.apache.brooklyn.api.entity.EntityInitializer; -import org.apache.brooklyn.api.internal.EntityLocal; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.entity.core.EntityInternal; -import org.apache.brooklyn.sensor.core.Sensors; -import org.apache.brooklyn.sensor.feed.windows.WindowsPerformanceCounterFeed; -import org.apache.brooklyn.util.core.config.ConfigBag; -import org.apache.brooklyn.util.text.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.reflect.TypeToken; - -public class WindowsPerformanceCounterSensors implements EntityInitializer { - - private static final Logger LOG = LoggerFactory.getLogger(WindowsPerformanceCounterSensors.class); - - public final static ConfigKey<Set<Map<String, String>>> PERFORMANCE_COUNTERS = ConfigKeys.newConfigKey(new TypeToken<Set<Map<String, String>>>(){}, "performance.counters"); - - protected final Set<Map<String, String>> sensors; - - public WindowsPerformanceCounterSensors(ConfigBag params) { - sensors = params.get(PERFORMANCE_COUNTERS); - } - - public WindowsPerformanceCounterSensors(Map<String, String> params) { - this(ConfigBag.newInstance(params)); - } - - @Override - public void apply(EntityLocal entity) { - WindowsPerformanceCounterFeed.Builder builder = WindowsPerformanceCounterFeed.builder() - .entity(entity); - for (Map<String, String> sensorConfig : sensors) { - String name = sensorConfig.get("name"); - String sensorType = sensorConfig.get("sensorType"); - Class<?> clazz; - try { - clazz = Strings.isNonEmpty(sensorType) - ? ((EntityInternal)entity).getManagementContext().getCatalog().getRootClassLoader().loadClass(sensorType) - : String.class; - } catch (ClassNotFoundException e) { - throw new IllegalStateException("Could not load type "+sensorType+" for sensor "+name, e); - } - builder.addSensor(sensorConfig.get("counter"), Sensors.newSensor(clazz, name, sensorConfig.get("description"))); - } - builder.build(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/event/feed/jmx/JmxAttributePollConfig.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxAttributePollConfig.java b/software/base/src/main/java/brooklyn/event/feed/jmx/JmxAttributePollConfig.java deleted file mode 100644 index ac8a27d..0000000 --- a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxAttributePollConfig.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.event.feed.jmx; - -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.sensor.feed.PollConfig; - -import com.google.common.base.Function; -import com.google.common.base.Functions; - -public class JmxAttributePollConfig<T> extends PollConfig<Object, T, JmxAttributePollConfig<T>>{ - - private ObjectName objectName; - private String attributeName; - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public JmxAttributePollConfig(AttributeSensor<T> sensor) { - super(sensor); - onSuccess((Function)Functions.identity()); - } - - public JmxAttributePollConfig(JmxAttributePollConfig<T> other) { - super(other); - this.objectName = other.objectName; - this.attributeName = other.attributeName; - } - - public ObjectName getObjectName() { - return objectName; - } - - public String getAttributeName() { - return attributeName; - } - - public JmxAttributePollConfig<T> objectName(ObjectName val) { - this.objectName = val; return this; - } - - public JmxAttributePollConfig<T> objectName(String val) { - try { - return objectName(new ObjectName(val)); - } catch (MalformedObjectNameException e) { - throw new IllegalArgumentException("Invalid object name ("+val+")", e); - } - } - - public JmxAttributePollConfig<T> attributeName(String val) { - this.attributeName = val; return this; - } - - @Override protected String toStringBaseName() { return "jmx"; } - @Override protected String toStringPollSource() { return objectName+":"+attributeName; } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/event/feed/jmx/JmxFeed.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxFeed.java b/software/base/src/main/java/brooklyn/event/feed/jmx/JmxFeed.java deleted file mode 100644 index 82e953b..0000000 --- a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxFeed.java +++ /dev/null @@ -1,437 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.event.feed.jmx; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; - -import javax.management.Notification; -import javax.management.NotificationFilter; -import javax.management.NotificationListener; -import javax.management.ObjectName; - -import org.apache.brooklyn.api.internal.EntityLocal; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.sensor.feed.AbstractFeed; -import org.apache.brooklyn.sensor.feed.AttributePollHandler; -import org.apache.brooklyn.sensor.feed.DelegatingPollHandler; -import org.apache.brooklyn.sensor.feed.PollHandler; -import org.apache.brooklyn.sensor.feed.Poller; -import org.apache.brooklyn.util.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.SoftwareProcessImpl; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.SetMultimap; -import com.google.common.collect.Sets; -import com.google.common.reflect.TypeToken; - - -/** - * Provides a feed of attribute values, by polling or subscribing over jmx. - * - * Example usage (e.g. in an entity that extends {@link SoftwareProcessImpl}): - * <pre> - * {@code - * private JmxFeed feed; - * - * //@Override - * protected void connectSensors() { - * super.connectSensors(); - * - * feed = JmxFeed.builder() - * .entity(this) - * .period(500, TimeUnit.MILLISECONDS) - * .pollAttribute(new JmxAttributePollConfig<Integer>(ERROR_COUNT) - * .objectName(requestProcessorMbeanName) - * .attributeName("errorCount")) - * .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP) - * .objectName(serverMbeanName) - * .attributeName("Started") - * .onError(Functions.constant(false))) - * .build(); - * } - * - * {@literal @}Override - * protected void disconnectSensors() { - * super.disconnectSensors(); - * if (feed != null) feed.stop(); - * } - * } - * </pre> - * - * @author aled - */ -public class JmxFeed extends AbstractFeed { - - public static final Logger log = LoggerFactory.getLogger(JmxFeed.class); - - public static final long JMX_CONNECTION_TIMEOUT_MS = 120*1000; - - public static final ConfigKey<JmxHelper> HELPER = ConfigKeys.newConfigKey(JmxHelper.class, "helper"); - public static final ConfigKey<Boolean> OWN_HELPER = ConfigKeys.newBooleanConfigKey("ownHelper"); - public static final ConfigKey<String> JMX_URI = ConfigKeys.newStringConfigKey("jmxUri"); - public static final ConfigKey<Long> JMX_CONNECTION_TIMEOUT = ConfigKeys.newLongConfigKey("jmxConnectionTimeout"); - - @SuppressWarnings("serial") - public static final ConfigKey<SetMultimap<String, JmxAttributePollConfig<?>>> ATTRIBUTE_POLLS = ConfigKeys.newConfigKey( - new TypeToken<SetMultimap<String, JmxAttributePollConfig<?>>>() {}, - "attributePolls"); - - @SuppressWarnings("serial") - public static final ConfigKey<SetMultimap<List<?>, JmxOperationPollConfig<?>>> OPERATION_POLLS = ConfigKeys.newConfigKey( - new TypeToken<SetMultimap<List<?>, JmxOperationPollConfig<?>>>() {}, - "operationPolls"); - - @SuppressWarnings("serial") - public static final ConfigKey<SetMultimap<NotificationFilter, JmxNotificationSubscriptionConfig<?>>> NOTIFICATION_SUBSCRIPTIONS = ConfigKeys.newConfigKey( - new TypeToken<SetMultimap<NotificationFilter, JmxNotificationSubscriptionConfig<?>>>() {}, - "notificationPolls"); - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - private EntityLocal entity; - private JmxHelper helper; - private long jmxConnectionTimeout = JMX_CONNECTION_TIMEOUT_MS; - private long period = 500; - private TimeUnit periodUnits = TimeUnit.MILLISECONDS; - private List<JmxAttributePollConfig<?>> attributePolls = Lists.newArrayList(); - private List<JmxOperationPollConfig<?>> operationPolls = Lists.newArrayList(); - private List<JmxNotificationSubscriptionConfig<?>> notificationSubscriptions = Lists.newArrayList(); - private String uniqueTag; - private volatile boolean built; - - public Builder entity(EntityLocal val) { - this.entity = val; - return this; - } - public Builder helper(JmxHelper val) { - this.helper = val; - return this; - } - public Builder period(Duration duration) { - return period(duration.toMilliseconds(), TimeUnit.MILLISECONDS); - } - public Builder period(long millis) { - return period(millis, TimeUnit.MILLISECONDS); - } - public Builder period(long val, TimeUnit units) { - this.period = val; - this.periodUnits = units; - return this; - } - public Builder pollAttribute(JmxAttributePollConfig<?> config) { - attributePolls.add(config); - return this; - } - public Builder pollOperation(JmxOperationPollConfig<?> config) { - operationPolls.add(config); - return this; - } - public Builder subscribeToNotification(JmxNotificationSubscriptionConfig<?> config) { - notificationSubscriptions.add(config); - return this; - } - public Builder uniqueTag(String uniqueTag) { - this.uniqueTag = uniqueTag; - return this; - } - public JmxFeed build() { - built = true; - JmxFeed result = new JmxFeed(this); - result.setEntity(checkNotNull(entity, "entity")); - result.start(); - return result; - } - @Override - protected void finalize() { - if (!built) log.warn("JmxFeed.Builder created, but build() never called"); - } - } - - private final SetMultimap<ObjectName, NotificationListener> notificationListeners = HashMultimap.create(); - - /** - * For rebind; do not call directly; use builder - */ - public JmxFeed() { - } - - protected JmxFeed(Builder builder) { - super(); - if (builder.helper != null) { - JmxHelper helper = builder.helper; - setConfig(HELPER, helper); - setConfig(OWN_HELPER, false); - setConfig(JMX_URI, helper.getUrl()); - } - setConfig(JMX_CONNECTION_TIMEOUT, builder.jmxConnectionTimeout); - - SetMultimap<String, JmxAttributePollConfig<?>> attributePolls = HashMultimap.<String,JmxAttributePollConfig<?>>create(); - for (JmxAttributePollConfig<?> config : builder.attributePolls) { - if (!config.isEnabled()) continue; - @SuppressWarnings({ "rawtypes", "unchecked" }) - JmxAttributePollConfig<?> configCopy = new JmxAttributePollConfig(config); - if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits); - attributePolls.put(configCopy.getObjectName().getCanonicalName() + configCopy.getAttributeName(), configCopy); - } - setConfig(ATTRIBUTE_POLLS, attributePolls); - - SetMultimap<List<?>, JmxOperationPollConfig<?>> operationPolls = HashMultimap.<List<?>,JmxOperationPollConfig<?>>create(); - for (JmxOperationPollConfig<?> config : builder.operationPolls) { - if (!config.isEnabled()) continue; - @SuppressWarnings({ "rawtypes", "unchecked" }) - JmxOperationPollConfig<?> configCopy = new JmxOperationPollConfig(config); - if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits); - operationPolls.put(configCopy.buildOperationIdentity(), configCopy); - } - setConfig(OPERATION_POLLS, operationPolls); - - SetMultimap<NotificationFilter, JmxNotificationSubscriptionConfig<?>> notificationSubscriptions = HashMultimap.create(); - for (JmxNotificationSubscriptionConfig<?> config : builder.notificationSubscriptions) { - if (!config.isEnabled()) continue; - notificationSubscriptions.put(config.getNotificationFilter(), config); - } - setConfig(NOTIFICATION_SUBSCRIPTIONS, notificationSubscriptions); - initUniqueTag(builder.uniqueTag, attributePolls, operationPolls, notificationSubscriptions); - } - - @Override - public void setEntity(EntityLocal entity) { - if (getConfig(HELPER) == null) { - JmxHelper helper = new JmxHelper(entity); - setConfig(HELPER, helper); - setConfig(OWN_HELPER, true); - setConfig(JMX_URI, helper.getUrl()); - } - super.setEntity(entity); - } - - public String getJmxUri() { - return getConfig(JMX_URI); - } - - protected JmxHelper getHelper() { - return getConfig(HELPER); - } - - @SuppressWarnings("unchecked") - protected Poller<Object> getPoller() { - return (Poller<Object>) super.getPoller(); - } - - @Override - protected boolean isConnected() { - return super.isConnected() && getHelper().isConnected(); - } - - @Override - protected void preStart() { - /* - * All actions on the JmxHelper are done async (through the poller's threading) so we don't - * block on start for a long time (e.g. if the entity is not contactable and doing a rebind - * on restart of brooklyn). Without that, one gets a 120 second pause with it stuck in a - * stack trace like: - * - * at brooklyn.event.feed.jmx.JmxHelper.sleep(JmxHelper.java:640) - * at brooklyn.event.feed.jmx.JmxHelper.connect(JmxHelper.java:320) - * at brooklyn.event.feed.jmx.JmxFeed.preStart(JmxFeed.java:172) - * at brooklyn.event.feed.AbstractFeed.start(AbstractFeed.java:68) - * at brooklyn.event.feed.jmx.JmxFeed$Builder.build(JmxFeed.java:119) - * at brooklyn.entity.java.JavaAppUtils.connectMXBeanSensors(JavaAppUtils.java:109) - * at brooklyn.entity.java.VanillaJavaApp.connectSensors(VanillaJavaApp.java:97) - * at brooklyn.entity.basic.SoftwareProcessImpl.callRebindHooks(SoftwareProcessImpl.java:189) - * at brooklyn.entity.basic.SoftwareProcessImpl.rebind(SoftwareProcessImpl.java:235) - * ... - * at brooklyn.entity.rebind.RebindManagerImpl.rebind(RebindManagerImpl.java:184) - */ - final SetMultimap<NotificationFilter, JmxNotificationSubscriptionConfig<?>> notificationSubscriptions = getConfig(NOTIFICATION_SUBSCRIPTIONS); - final SetMultimap<List<?>, JmxOperationPollConfig<?>> operationPolls = getConfig(OPERATION_POLLS); - final SetMultimap<String, JmxAttributePollConfig<?>> attributePolls = getConfig(ATTRIBUTE_POLLS); - - getPoller().submit(new Callable<Void>() { - public Void call() { - getHelper().connect(getConfig(JMX_CONNECTION_TIMEOUT)); - return null; - } - @Override public String toString() { return "Connect JMX "+getHelper().getUrl(); } - }); - - for (final NotificationFilter filter : notificationSubscriptions.keySet()) { - getPoller().submit(new Callable<Void>() { - public Void call() { - // TODO Could config.getObjectName have wildcards? Is this code safe? - Set<JmxNotificationSubscriptionConfig<?>> configs = notificationSubscriptions.get(filter); - NotificationListener listener = registerNotificationListener(configs); - ObjectName objectName = Iterables.get(configs, 0).getObjectName(); - notificationListeners.put(objectName, listener); - return null; - } - @Override public String toString() { return "Register JMX notifications: "+notificationSubscriptions.get(filter); } - }); - } - - // Setup polling of sensors - for (final String jmxAttributeName : attributePolls.keys()) { - registerAttributePoller(attributePolls.get(jmxAttributeName)); - } - - // Setup polling of operations - for (final List<?> operationIdentifier : operationPolls.keys()) { - registerOperationPoller(operationPolls.get(operationIdentifier)); - } - } - - @Override - protected void preStop() { - super.preStop(); - - for (Map.Entry<ObjectName, NotificationListener> entry : notificationListeners.entries()) { - unregisterNotificationListener(entry.getKey(), entry.getValue()); - } - notificationListeners.clear(); - } - - @Override - protected void postStop() { - super.postStop(); - JmxHelper helper = getHelper(); - Boolean ownHelper = getConfig(OWN_HELPER); - if (helper != null && ownHelper) helper.terminate(); - } - - /** - * Registers to poll a jmx-operation for an ObjectName, where all the given configs are for the same ObjectName + operation + parameters. - */ - private void registerOperationPoller(Set<JmxOperationPollConfig<?>> configs) { - Set<AttributePollHandler<? super Object>> handlers = Sets.newLinkedHashSet(); - long minPeriod = Integer.MAX_VALUE; - - final ObjectName objectName = Iterables.get(configs, 0).getObjectName(); - final String operationName = Iterables.get(configs, 0).getOperationName(); - final List<String> signature = Iterables.get(configs, 0).getSignature(); - final List<?> params = Iterables.get(configs, 0).getParams(); - - for (JmxOperationPollConfig<?> config : configs) { - handlers.add(new AttributePollHandler<Object>(config, getEntity(), this)); - if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); - } - - getPoller().scheduleAtFixedRate( - new Callable<Object>() { - public Object call() throws Exception { - if (log.isDebugEnabled()) log.debug("jmx operation polling for {} sensors at {} -> {}", new Object[] {getEntity(), getJmxUri(), operationName}); - if (signature.size() == params.size()) { - return getHelper().operation(objectName, operationName, signature, params); - } else { - return getHelper().operation(objectName, operationName, params.toArray()); - } - } - }, - new DelegatingPollHandler<Object>(handlers), minPeriod); - } - - /** - * Registers to poll a jmx-attribute for an ObjectName, where all the given configs are for that same ObjectName + attribute. - */ - private void registerAttributePoller(Set<JmxAttributePollConfig<?>> configs) { - Set<AttributePollHandler<? super Object>> handlers = Sets.newLinkedHashSet(); - long minPeriod = Integer.MAX_VALUE; - - final ObjectName objectName = Iterables.get(configs, 0).getObjectName(); - final String jmxAttributeName = Iterables.get(configs, 0).getAttributeName(); - - for (JmxAttributePollConfig<?> config : configs) { - handlers.add(new AttributePollHandler<Object>(config, getEntity(), this)); - if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); - } - - // TODO Not good calling this holding the synchronization lock - getPoller().scheduleAtFixedRate( - new Callable<Object>() { - public Object call() throws Exception { - if (log.isTraceEnabled()) log.trace("jmx attribute polling for {} sensors at {} -> {}", new Object[] {getEntity(), getJmxUri(), jmxAttributeName}); - return getHelper().getAttribute(objectName, jmxAttributeName); - } - }, - new DelegatingPollHandler<Object>(handlers), minPeriod); - } - - /** - * Registers to subscribe to notifications for an ObjectName, where all the given configs are for that same ObjectName + filter. - */ - private NotificationListener registerNotificationListener(Set<JmxNotificationSubscriptionConfig<?>> configs) { - final List<AttributePollHandler<? super javax.management.Notification>> handlers = Lists.newArrayList(); - - final ObjectName objectName = Iterables.get(configs, 0).getObjectName(); - final NotificationFilter filter = Iterables.get(configs, 0).getNotificationFilter(); - - for (final JmxNotificationSubscriptionConfig<?> config : configs) { - AttributePollHandler<javax.management.Notification> handler = new AttributePollHandler<javax.management.Notification>(config, getEntity(), this) { - @Override protected Object transformValueOnSuccess(javax.management.Notification val) { - if (config.getOnNotification() != null) { - return config.getOnNotification().apply(val); - } else { - Object result = super.transformValueOnSuccess(val); - if (result instanceof javax.management.Notification) - return ((javax.management.Notification)result).getUserData(); - return result; - } - } - }; - handlers.add(handler); - } - final PollHandler<javax.management.Notification> compoundHandler = new DelegatingPollHandler<javax.management.Notification>(handlers); - - NotificationListener listener = new NotificationListener() { - @Override public void handleNotification(Notification notification, Object handback) { - compoundHandler.onSuccess(notification); - } - }; - getHelper().addNotificationListener(objectName, listener, filter); - - return listener; - } - - private void unregisterNotificationListener(ObjectName objectName, NotificationListener listener) { - try { - getHelper().removeNotificationListener(objectName, listener); - } catch (RuntimeException e) { - log.warn("Failed to unregister listener: "+objectName+", "+listener+"; continuing...", e); - } - } - - @Override - public String toString() { - return "JmxFeed["+(getManagementContext()!=null&&getManagementContext().isRunning()?getJmxUri():"mgmt-not-running")+"]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/event/feed/jmx/JmxHelper.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxHelper.java b/software/base/src/main/java/brooklyn/event/feed/jmx/JmxHelper.java deleted file mode 100644 index cfc405e..0000000 --- a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxHelper.java +++ /dev/null @@ -1,725 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.event.feed.jmx; - -import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth; -import groovy.time.TimeDuration; - -import java.io.IOException; -import java.security.KeyStore; -import java.security.PrivateKey; -import java.security.cert.Certificate; -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; -import java.util.WeakHashMap; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import javax.management.AttributeNotFoundException; -import javax.management.InstanceAlreadyExistsException; -import javax.management.InstanceNotFoundException; -import javax.management.InvalidAttributeValueException; -import javax.management.JMX; -import javax.management.ListenerNotFoundException; -import javax.management.MBeanServerConnection; -import javax.management.MalformedObjectNameException; -import javax.management.NotCompliantMBeanException; -import javax.management.NotificationFilter; -import javax.management.NotificationListener; -import javax.management.ObjectInstance; -import javax.management.ObjectName; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.TabularData; -import javax.management.remote.JMXConnector; -import javax.management.remote.JMXConnectorFactory; -import javax.management.remote.JMXServiceURL; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocketFactory; -import javax.net.ssl.TrustManager; - -import org.apache.brooklyn.api.internal.EntityLocal; -import org.apache.brooklyn.util.core.crypto.SecureKeys; -import org.apache.brooklyn.util.crypto.SslTrustUtils; -import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; -import org.apache.brooklyn.util.jmx.jmxmp.JmxmpAgent; -import org.apache.brooklyn.util.repeat.Repeater; -import org.apache.brooklyn.util.time.Duration; -import org.apache.brooklyn.util.time.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.java.JmxSupport; -import brooklyn.entity.java.UsesJmx; - -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; - -public class JmxHelper { - - private static final Logger LOG = LoggerFactory.getLogger(JmxHelper.class); - - public static final String JMX_URL_FORMAT = "service:jmx:rmi:///jndi/rmi://%s:%d/%s"; - // first host:port may be ignored, so above is sufficient, but not sure - public static final String RMI_JMX_URL_FORMAT = "service:jmx:rmi://%s:%d/jndi/rmi://%s:%d/%s"; - // jmxmp - public static final String JMXMP_URL_FORMAT = "service:jmx:jmxmp://%s:%d"; - - // Tracks the MBeans we have failed to find, with a set keyed off the url - private static final Map<String, Set<ObjectName>> notFoundMBeansByUrl = Collections.synchronizedMap(new WeakHashMap<String, Set<ObjectName>>()); - - public static final Map<String, String> CLASSES = ImmutableMap.<String,String>builder() - .put("Integer", Integer.TYPE.getName()) - .put("Long", Long.TYPE.getName()) - .put("Boolean", Boolean.TYPE.getName()) - .put("Byte", Byte.TYPE.getName()) - .put("Character", Character.TYPE.getName()) - .put("Double", Double.TYPE.getName()) - .put("Float", Float.TYPE.getName()) - .put("GStringImpl", String.class.getName()) - .put("LinkedHashMap", Map.class.getName()) - .put("TreeMap", Map.class.getName()) - .put("HashMap", Map.class.getName()) - .put("ConcurrentHashMap", Map.class.getName()) - .put("TabularDataSupport", TabularData.class.getName()) - .put("CompositeDataSupport", CompositeData.class.getName()) - .build(); - - /** constructs a JMX URL suitable for connecting to the given entity, being smart about JMX/RMI vs JMXMP */ - public static String toJmxUrl(EntityLocal entity) { - String url = entity.getAttribute(UsesJmx.JMX_URL); - if (url != null) { - return url; - } else { - new JmxSupport(entity, null).setJmxUrl(); - url = entity.getAttribute(UsesJmx.JMX_URL); - return Preconditions.checkNotNull(url, "Could not find URL for "+entity); - } - } - - /** constructs an RMI/JMX URL with the given inputs - * (where the RMI Registry Port should be non-null, and at least one must be non-null) */ - public static String toRmiJmxUrl(String host, Integer jmxRmiServerPort, Integer rmiRegistryPort, String context) { - if (rmiRegistryPort != null && rmiRegistryPort > 0) { - if (jmxRmiServerPort!=null && jmxRmiServerPort > 0 && jmxRmiServerPort!=rmiRegistryPort) { - // we have an explicit known JMX RMI server port (e.g. because we are using the agent), - // distinct from the RMI registry port - // (if the ports are the same, it is a short-hand, and don't use this syntax!) - return String.format(RMI_JMX_URL_FORMAT, host, jmxRmiServerPort, host, rmiRegistryPort, context); - } - return String.format(JMX_URL_FORMAT, host, rmiRegistryPort, context); - } else if (jmxRmiServerPort!=null && jmxRmiServerPort > 0) { - LOG.warn("No RMI registry port set for "+host+"; attempting to use JMX port for RMI lookup"); - return String.format(JMX_URL_FORMAT, host, jmxRmiServerPort, context); - } else { - LOG.warn("No RMI/JMX details set for "+host+"; returning null"); - return null; - } - } - - /** constructs a JMXMP URL for connecting to the given host and port */ - public static String toJmxmpUrl(String host, Integer jmxmpPort) { - return "service:jmx:jmxmp://"+host+(jmxmpPort!=null ? ":"+jmxmpPort : ""); - } - - final EntityLocal entity; - final String url; - final String user; - final String password; - - private volatile transient JMXConnector connector; - private volatile transient MBeanServerConnection connection; - private transient boolean triedConnecting; - private transient boolean failedReconnecting; - private transient long failedReconnectingTime; - private int minTimeBetweenReconnectAttempts = 1000; - private final AtomicBoolean terminated = new AtomicBoolean(); - - // Tracks the MBeans we have failed to find for this JmsHelper's connection URL (so can log just once for each) - private final Set<ObjectName> notFoundMBeans; - - public JmxHelper(EntityLocal entity) { - this(toJmxUrl(entity), entity, entity.getAttribute(UsesJmx.JMX_USER), entity.getAttribute(UsesJmx.JMX_PASSWORD)); - - if (entity.getAttribute(UsesJmx.JMX_URL) == null) { - entity.setAttribute(UsesJmx.JMX_URL, url); - } - } - - // TODO split this in to two classes, one for entities, and one entity-neutral - // (simplifying set of constructors below) - - public JmxHelper(String url) { - this(url, null, null); - } - - public JmxHelper(String url, String user, String password) { - this(url, null, user, password); - } - - public JmxHelper(String url, EntityLocal entity, String user, String password) { - this.url = url; - this.entity = entity; - this.user = user; - this.password = password; - - synchronized (notFoundMBeansByUrl) { - Set<ObjectName> set = notFoundMBeansByUrl.get(url); - if (set == null) { - set = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<ObjectName, Boolean>())); - notFoundMBeansByUrl.put(url, set); - } - notFoundMBeans = set; - } - } - - public void setMinTimeBetweenReconnectAttempts(int val) { - minTimeBetweenReconnectAttempts = val; - } - - public String getUrl(){ - return url; - } - - // ============== connection related calls ======================= - - //for tesing purposes - protected MBeanServerConnection getConnection() { - return connection; - } - - /** - * Checks if the JmxHelper is connected. Returned value could be stale as soon - * as it is received. - * - * This method is thread safe. - * - * @return true if connected, false otherwise. - */ - public boolean isConnected() { - return connection!=null; - } - - /** - * Reconnects. If it already is connected, it disconnects first. - * - * @throws IOException - */ - public synchronized void reconnectWithRetryDampened() throws IOException { - // If we've already tried reconnecting very recently, don't try again immediately - if (failedReconnecting) { - long timeSince = (System.currentTimeMillis() - failedReconnectingTime); - if (timeSince < minTimeBetweenReconnectAttempts) { - String msg = "Not reconnecting to JMX at "+url+" because attempt failed "+Time.makeTimeStringRounded(timeSince)+" ago"; - throw new IllegalStateException(msg); - } - } - - reconnect(); - } - - public synchronized void reconnect() throws IOException { - disconnect(); - - try { - connect(); - failedReconnecting = false; - } catch (Exception e) { - if (failedReconnecting) { - if (LOG.isDebugEnabled()) LOG.debug("unable to re-connect to JMX url (repeated failure): {}: {}", url, e); - } else { - LOG.debug("unable to re-connect to JMX url {} (rethrowing): {}", url, e); - failedReconnecting = true; - } - failedReconnectingTime = System.currentTimeMillis(); - throw Throwables.propagate(e); - } - } - - /** attempts to connect immediately */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - public synchronized void connect() throws IOException { - if (terminated.get()) throw new IllegalStateException("JMX Helper "+this+" already terminated"); - if (connection != null) return; - - triedConnecting = true; - if (connector != null) connector.close(); - JMXServiceURL serviceUrl = new JMXServiceURL(url); - Map env = getConnectionEnvVars(); - try { - connector = JMXConnectorFactory.connect(serviceUrl, env); - } catch (NullPointerException npe) { - //some software -- eg WSO2 -- will throw an NPE exception if the JMX connection can't be created, instead of an IOException. - //this is a break of contract with the JMXConnectorFactory.connect method, so this code verifies if the NPE is - //thrown by a known offender (wso2) and if so replaces the bad exception by a new IOException. - //ideally WSO2 will fix this bug and we can remove this code. - boolean thrownByWso2 = npe.getStackTrace()[0].toString().contains("org.wso2.carbon.core.security.CarbonJMXAuthenticator.authenticate"); - if (thrownByWso2) { - throw new IOException("Failed to connect to url "+url+". NullPointerException is thrown, but replaced by an IOException to fix a WSO2 JMX problem", npe); - } else { - throw npe; - } - } catch (IOException e) { - Exceptions.propagateIfFatal(e); - if (terminated.get()) { - throw new IllegalStateException("JMX Helper "+this+" already terminated", e); - } else { - throw e; - } - } - connection = connector.getMBeanServerConnection(); - - if (terminated.get()) { - disconnectNow(); - throw new IllegalStateException("JMX Helper "+this+" already terminated"); - } - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public Map getConnectionEnvVars() { - Map env = new LinkedHashMap(); - - if (groovyTruth(user) && groovyTruth(password)) { - String[] creds = new String[] {user, password}; - env.put(JMXConnector.CREDENTIALS, creds); - } - - if (entity!=null && groovyTruth(entity.getConfig(UsesJmx.JMX_SSL_ENABLED))) { - env.put("jmx.remote.profiles", JmxmpAgent.TLS_JMX_REMOTE_PROFILES); - - PrivateKey key = entity.getConfig(UsesJmx.JMX_SSL_ACCESS_KEY); - Certificate cert = entity.getConfig(UsesJmx.JMX_SSL_ACCESS_CERT); - KeyStore ks = SecureKeys.newKeyStore(); - try { - KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - if (key!=null) { - ks.setKeyEntry("brooklyn-jmx-access", key, "".toCharArray(), new Certificate[] { cert }); - } - kmf.init(ks, "".toCharArray()); - - TrustManager tms = - // TODO use root cert for trusting server - //trustStore!=null ? SecureKeys.getTrustManager(trustStore) : - SslTrustUtils.TRUST_ALL; - - SSLContext ctx = SSLContext.getInstance("TLSv1"); - ctx.init(kmf.getKeyManagers(), new TrustManager[] { tms }, null); - SSLSocketFactory ssf = ctx.getSocketFactory(); - env.put(JmxmpAgent.TLS_SOCKET_FACTORY_PROPERTY, ssf); - - } catch (Exception e) { - LOG.warn("Error setting key "+key+" for "+entity+": "+e, e); - } - } - - return env; - } - - /** - * Continuously attempts to connect for at least the indicated amount of time; or indefinitely if -1. This method - * is useful when you are not sure if the system you are trying to connect to already is up and running. - * - * This method doesn't throw an Exception, but returns true on success, false otherwise. - * - * TODO: What happens if already connected? - * - * @param timeoutMs - * @return - */ - public boolean connect(long timeoutMs) { - if (LOG.isDebugEnabled()) LOG.debug("Connecting to JMX URL: {} ({})", url, ((timeoutMs == -1) ? "indefinitely" : timeoutMs+"ms timeout")); - long startMs = System.currentTimeMillis(); - long endMs = (timeoutMs == -1) ? Long.MAX_VALUE : (startMs + timeoutMs); - long currentTime = startMs; - Throwable lastError = null; - int attempt = 0; - while (currentTime <= endMs) { - currentTime = System.currentTimeMillis(); - if (attempt != 0) sleep(100); //sleep 100 to prevent thrashing and facilitate interruption - if (LOG.isTraceEnabled()) LOG.trace("trying connection to {} at time {}", url, currentTime); - - try { - connect(); - return true; - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - if (!terminated.get() && shouldRetryOn(e)) { - if (LOG.isDebugEnabled()) LOG.debug("Attempt {} failed connecting to {} ({})", new Object[] {attempt + 1, url, e.getMessage()}); - lastError = e; - } else { - throw Exceptions.propagate(e); - } - } - attempt++; - } - LOG.warn("unable to connect to JMX url: "+url, lastError); - return false; - } - - private boolean shouldRetryOn(Exception e) { - // Expect SecurityException, IOException, etc. - // But can also see things like javax.naming.ServiceUnavailableException with WSO2 app-servers. - // So let's not try to second guess strange behaviours that future entities will exhibit. - // - // However, if it was our request that was invalid then not worth retrying. - - if (e instanceof AttributeNotFoundException) return false; - if (e instanceof InstanceAlreadyExistsException) return false; - if (e instanceof InstanceNotFoundException) return false; - if (e instanceof InvalidAttributeValueException) return false; - if (e instanceof ListenerNotFoundException) return false; - if (e instanceof MalformedObjectNameException) return false; - if (e instanceof NotCompliantMBeanException) return false; - if (e instanceof InterruptedException) return false; - if (e instanceof RuntimeInterruptedException) return false; - - return true; - } - - /** - * A thread-safe version of {@link #disconnectNow()}. - * - * This method is threadsafe. - */ - public synchronized void disconnect() { - disconnectNow(); - } - - /** - * Disconnects, preventing subsequent connections to be made. Method doesn't throw an exception. - * - * Can safely be called if already disconnected. - * - * This method is not threadsafe, but will thus not block if - * another thread is taking a long time for connections to timeout. - * - * Any concurrent requests will likely get an IOException - see - * {@linkplain http://docs.oracle.com/javase/7/docs/api/javax/management/remote/JMXConnector.html#close()}. - * - */ - public void terminate() { - terminated.set(true); - disconnectNow(); - } - - protected void disconnectNow() { - triedConnecting = false; - if (connector != null) { - if (LOG.isDebugEnabled()) LOG.debug("Disconnecting from JMX URL {}", url); - try { - connector.close(); - } catch (Exception e) { - // close attempts to connect to close cleanly; and if it can't, it throws; - // often we disconnect as part of shutdown, even if the other side has already stopped -- - // so swallow exceptions (no situations known where we need a clean closure on the remote side) - if (LOG.isDebugEnabled()) LOG.debug("Caught exception disconnecting from JMX at {} ({})", url, e.getMessage()); - if (LOG.isTraceEnabled()) LOG.trace("Details for exception disconnecting JMX", e); - } finally { - connector = null; - connection = null; - } - } - } - - /** - * Gets a usable MBeanServerConnection. - * - * Method is threadsafe. - * - * @returns the MBeanServerConnection - * @throws IllegalStateException if not connected. - */ - private synchronized MBeanServerConnection getConnectionOrFail() { - if (isConnected()) - return getConnection(); - - if (triedConnecting) { - throw new IllegalStateException("Failed to connect to JMX at "+url); - } else { - String msg = "Not connected (and not attempted to connect) to JMX at "+url+ - (failedReconnecting ? (" (last reconnect failure at "+ Time.makeDateString(failedReconnectingTime) + ")") : ""); - throw new IllegalStateException(msg); - } - } - - private <T> T invokeWithReconnect(Callable<T> task) { - try { - return task.call(); - } catch (Exception e) { - if (shouldRetryOn(e)) { - try { - reconnectWithRetryDampened(); - return task.call(); - } catch (Exception e2) { - throw Throwables.propagate(e2); - } - } else { - throw Throwables.propagate(e); - } - } - } - - // ====================== query related calls ======================================= - - /** - * Converts from an object name pattern to a real object name, by querying with findMBean; - * if no matching MBean can be found (or if more than one match found) then returns null. - * If the supplied object name is not a pattern then just returns that. If the - */ - public ObjectName toLiteralObjectName(ObjectName objectName) { - if (checkNotNull(objectName, "objectName").isPattern()) { - ObjectInstance bean = findMBean(objectName); - return (bean != null) ? bean.getObjectName() : null; - } else { - return objectName; - } - } - - public Set<ObjectInstance> findMBeans(final ObjectName objectName) { - return invokeWithReconnect(new Callable<Set<ObjectInstance>>() { - public Set<ObjectInstance> call() throws Exception { - return getConnectionOrFail().queryMBeans(objectName, null); - }}); - } - - public ObjectInstance findMBean(ObjectName objectName) { - Set<ObjectInstance> beans = findMBeans(objectName); - if (beans.size() == 1) { - notFoundMBeans.remove(objectName); - return Iterables.getOnlyElement(beans); - } else { - boolean changed = notFoundMBeans.add(objectName); - - if (beans.size() > 1) { - if (changed) { - LOG.warn("JMX object name query returned {} values for {} at {}; ignoring all", - new Object[] {beans.size(), objectName.getCanonicalName(), url}); - } else { - if (LOG.isDebugEnabled()) LOG.debug("JMX object name query returned {} values for {} at {} (repeating); ignoring all", - new Object[] {beans.size(), objectName.getCanonicalName(), url}); - } - } else { - if (changed) { - LOG.warn("JMX object {} not found at {}", objectName.getCanonicalName(), url); - } else { - if (LOG.isDebugEnabled()) LOG.debug("JMX object {} not found at {} (repeating)", objectName.getCanonicalName(), url); - } - } - return null; - } - } - - public Set<ObjectInstance> doesMBeanExistsEventually(final ObjectName objectName, Duration timeout) { - return doesMBeanExistsEventually(objectName, timeout.toMilliseconds(), TimeUnit.MILLISECONDS); - } - public Set<ObjectInstance> doesMBeanExistsEventually(final ObjectName objectName, TimeDuration timeout) { - return doesMBeanExistsEventually(objectName, timeout.toMilliseconds(), TimeUnit.MILLISECONDS); - } - - public Set<ObjectInstance> doesMBeanExistsEventually(final ObjectName objectName, long timeoutMillis) { - return doesMBeanExistsEventually(objectName, timeoutMillis, TimeUnit.MILLISECONDS); - } - - public Set<ObjectInstance> doesMBeanExistsEventually(String objectName, Duration timeout) { - return doesMBeanExistsEventually(createObjectName(objectName), timeout); - } - public Set<ObjectInstance> doesMBeanExistsEventually(String objectName, TimeDuration timeout) { - return doesMBeanExistsEventually(createObjectName(objectName), timeout); - } - - public Set<ObjectInstance> doesMBeanExistsEventually(String objectName, long timeout, TimeUnit timeUnit) { - return doesMBeanExistsEventually(createObjectName(objectName), timeout, timeUnit); - } - - /** returns set of beans found, with retry, empty set if none after timeout */ - public Set<ObjectInstance> doesMBeanExistsEventually(final ObjectName objectName, long timeout, TimeUnit timeUnit) { - final long timeoutMillis = timeUnit.toMillis(timeout); - final AtomicReference<Set<ObjectInstance>> beans = new AtomicReference<Set<ObjectInstance>>(ImmutableSet.<ObjectInstance>of()); - try { - Repeater.create("Wait for "+objectName) - .limitTimeTo(timeout, timeUnit) - .every(500, TimeUnit.MILLISECONDS) - .until(new Callable<Boolean>() { - public Boolean call() { - connect(timeoutMillis); - beans.set(findMBeans(objectName)); - return !beans.get().isEmpty(); - }}) - .rethrowException() - .run(); - return beans.get(); - } catch (Exception e) { - throw Exceptions.propagate(e); - } - } - - public void assertMBeanExistsEventually(ObjectName objectName, Duration timeout) { - assertMBeanExistsEventually(objectName, timeout.toMilliseconds(), TimeUnit.MILLISECONDS); - } - public void assertMBeanExistsEventually(ObjectName objectName, TimeDuration timeout) { - assertMBeanExistsEventually(objectName, timeout.toMilliseconds(), TimeUnit.MILLISECONDS); - } - - public void assertMBeanExistsEventually(ObjectName objectName, long timeoutMillis) { - assertMBeanExistsEventually(objectName, timeoutMillis, TimeUnit.MILLISECONDS); - } - - public void assertMBeanExistsEventually(ObjectName objectName, long timeout, TimeUnit timeUnit) { - Set<ObjectInstance> beans = doesMBeanExistsEventually(objectName, timeout, timeUnit); - if (beans.size() != 1) { - throw new IllegalStateException("MBean "+objectName+" not found within "+timeout+ - (beans.size() > 1 ? "; found multiple matches: "+beans : "")); - } - } - - /** - * Returns a specific attribute for a JMX {@link ObjectName}. - */ - public Object getAttribute(ObjectName objectName, final String attribute) { - final ObjectName realObjectName = toLiteralObjectName(objectName); - - if (realObjectName != null) { - Object result = invokeWithReconnect(new Callable<Object>() { - public Object call() throws Exception { - return getConnectionOrFail().getAttribute(realObjectName, attribute); - }}); - - if (LOG.isTraceEnabled()) LOG.trace("From {}, for jmx attribute {}.{}, got value {}", new Object[] {url, objectName.getCanonicalName(), attribute, result}); - return result; - } else { - return null; - } - } - - public void setAttribute(String objectName, String attribute, Object val) { - setAttribute(createObjectName(objectName), attribute, val); - } - - public void setAttribute(ObjectName objectName, final String attribute, final Object val) { - final ObjectName realObjectName = toLiteralObjectName(objectName); - - if (realObjectName != null) { - invokeWithReconnect(new Callable<Void>() { - public Void call() throws Exception { - getConnectionOrFail().setAttribute(realObjectName, new javax.management.Attribute(attribute, val)); - return null; - }}); - if (LOG.isTraceEnabled()) LOG.trace("From {}, for jmx attribute {}.{}, set value {}", new Object[] {url, objectName.getCanonicalName(), attribute, val}); - } else { - if (LOG.isDebugEnabled()) LOG.debug("From {}, cannot set attribute {}.{}, because mbean not found", new Object[] {url, objectName.getCanonicalName(), attribute}); - } - } - - /** @see #operation(ObjectName, String, Object ...) */ - public Object operation(String objectName, String method, Object... arguments) { - return operation(createObjectName(objectName), method, arguments); - } - - /** - * Executes an operation on a JMX {@link ObjectName}. - */ - public Object operation(ObjectName objectName, final String method, final Object... arguments) { - final ObjectName realObjectName = toLiteralObjectName(objectName); - final String[] signature = new String[arguments.length]; - for (int i = 0; i < arguments.length; i++) { - Class<?> clazz = arguments[i].getClass(); - signature[i] = (CLASSES.containsKey(clazz.getSimpleName()) ? CLASSES.get(clazz.getSimpleName()) : clazz.getName()); - } - - Object result = invokeWithReconnect(new Callable<Object>() { - public Object call() throws Exception { - return getConnectionOrFail().invoke(realObjectName, method, arguments, signature); - }}); - - if (LOG.isTraceEnabled()) LOG.trace("From {}, for jmx operation {}.{}({}), got value {}", new Object[] {url, realObjectName.getCanonicalName(), method, Arrays.asList(arguments), - result}); - return result; - } - - public void addNotificationListener(String objectName, NotificationListener listener) { - addNotificationListener(createObjectName(objectName), listener, null); - } - - public void addNotificationListener(String objectName, NotificationListener listener, NotificationFilter filter) { - addNotificationListener(createObjectName(objectName), listener, filter); - } - - public void addNotificationListener(ObjectName objectName, NotificationListener listener) { - addNotificationListener(objectName, listener, null); - } - - public void addNotificationListener(final ObjectName objectName, final NotificationListener listener, final NotificationFilter filter) { - invokeWithReconnect(new Callable<Void>() { - public Void call() throws Exception { - getConnectionOrFail().addNotificationListener(objectName, listener, filter, null); - return null; - }}); - } - - public void removeNotificationListener(String objectName, NotificationListener listener) { - removeNotificationListener(createObjectName(objectName), listener); - } - - public void removeNotificationListener(final ObjectName objectName, final NotificationListener listener) { - removeNotificationListener(objectName, listener, null); - } - - public void removeNotificationListener(final ObjectName objectName, final NotificationListener listener, final NotificationFilter filter) { - if (isConnected()) invokeWithReconnect(new Callable<Void>() { - public Void call() throws Exception { - getConnectionOrFail().removeNotificationListener(objectName, listener, filter, null); - return null; - }}); - } - - public <M> M getProxyObject(String objectName, Class<M> mbeanInterface) { - return getProxyObject(createObjectName(objectName), mbeanInterface); - } - - public <M> M getProxyObject(ObjectName objectName, Class<M> mbeanInterface) { - MBeanServerConnection connection = getConnectionOrFail(); - return JMX.newMBeanProxy(connection, objectName, mbeanInterface, false); - } - - public static ObjectName createObjectName(String name) { - try { - return new ObjectName(name); - } catch (MalformedObjectNameException e) { - throw Throwables.propagate(e); - } - } - - private static void sleep(long sleepTimeMillis) { - try { - Thread.sleep(sleepTimeMillis); - } catch (InterruptedException e) { - throw new RuntimeInterruptedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/event/feed/jmx/JmxNotificationFilters.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxNotificationFilters.java b/software/base/src/main/java/brooklyn/event/feed/jmx/JmxNotificationFilters.java deleted file mode 100644 index 8cf5d62..0000000 --- a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxNotificationFilters.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.event.feed.jmx; - -import javax.management.Notification; -import javax.management.NotificationFilter; -import javax.management.NotificationFilterSupport; - -public class JmxNotificationFilters { - - private JmxNotificationFilters() {} // instead use static utility methods - - /** - * Matches the given notification type. - * @see {@link NotificationFilterSupport#enableType(String)} - */ - public static NotificationFilter matchesType(String type) { - return matchesTypes(type); - } - - /** - * Matches any of the given notification types. - * @see {@link NotificationFilterSupport#enableType(String)} - */ - public static NotificationFilter matchesTypes(String... types) { - NotificationFilterSupport result = new NotificationFilterSupport(); - for (String type : types) { - result.enableType(type); - } - return result; - } - - /** - * @deprecated since 0.6.0; - * only works if this brooklyn class is on the classpath of the JVM that your - * subscribing to notifications on (because it tries to push the filter instance - * to that JVM). So of very limited use in real-world java processes to be managed. - * Therefore this will be deleted to avoid people hitting this surprising behaviour. - */ - @SuppressWarnings("serial") - public static NotificationFilter matchesTypeRegex(final String typeRegex) { - return new NotificationFilter() { - @Override public boolean isNotificationEnabled(Notification notif) { - return notif.getType().matches(typeRegex); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/event/feed/jmx/JmxNotificationSubscriptionConfig.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxNotificationSubscriptionConfig.java b/software/base/src/main/java/brooklyn/event/feed/jmx/JmxNotificationSubscriptionConfig.java deleted file mode 100644 index b27ffef..0000000 --- a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxNotificationSubscriptionConfig.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.event.feed.jmx; - -import javax.management.MalformedObjectNameException; -import javax.management.Notification; -import javax.management.NotificationFilter; -import javax.management.ObjectName; - -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.sensor.feed.FeedConfig; -import org.apache.brooklyn.util.collections.MutableList; - -import com.google.common.base.Function; -import com.google.common.base.Functions; - -public class JmxNotificationSubscriptionConfig<T> extends FeedConfig<javax.management.Notification, T, JmxNotificationSubscriptionConfig<T>>{ - - private ObjectName objectName; - private NotificationFilter notificationFilter; - private Function<Notification, T> onNotification; - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public JmxNotificationSubscriptionConfig(AttributeSensor<T> sensor) { - super(sensor); - onSuccess((Function)Functions.identity()); - } - - public JmxNotificationSubscriptionConfig(JmxNotificationSubscriptionConfig<T> other) { - super(other); - this.objectName = other.objectName; - this.notificationFilter = other.notificationFilter; - this.onNotification = other.onNotification; - } - - public ObjectName getObjectName() { - return objectName; - } - - public NotificationFilter getNotificationFilter() { - return notificationFilter; - } - - public Function<Notification, T> getOnNotification() { - return onNotification; - } - - public JmxNotificationSubscriptionConfig<T> objectName(ObjectName val) { - this.objectName = val; return this; - } - - public JmxNotificationSubscriptionConfig<T> objectName(String val) { - try { - return objectName(new ObjectName(val)); - } catch (MalformedObjectNameException e) { - throw new IllegalArgumentException("Invalid object name ("+val+")", e); - } - } - - public JmxNotificationSubscriptionConfig<T> notificationFilter(NotificationFilter val) { - this.notificationFilter = val; return this; - } - - public JmxNotificationSubscriptionConfig<T> onNotification(Function<Notification,T> val) { - this.onNotification = val; return this; - } - - @Override - protected Object toStringPollSource() { - return objectName; - } - - @Override - protected MutableList<Object> equalsFields() { - return super.equalsFields() - .appendIfNotNull(notificationFilter).appendIfNotNull(onNotification); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/event/feed/jmx/JmxOperationPollConfig.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxOperationPollConfig.java b/software/base/src/main/java/brooklyn/event/feed/jmx/JmxOperationPollConfig.java deleted file mode 100644 index 169f330..0000000 --- a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxOperationPollConfig.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.event.feed.jmx; - -import java.util.Collections; -import java.util.List; - -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.sensor.feed.PollConfig; - -import com.google.common.base.Function; -import com.google.common.base.Functions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -public class JmxOperationPollConfig<T> extends PollConfig<Object, T, JmxOperationPollConfig<T>>{ - - private ObjectName objectName; - private String operationName; - private List<String> signature = Collections.emptyList(); - private List<?> params = Collections.emptyList(); - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public JmxOperationPollConfig(AttributeSensor<T> sensor) { - super(sensor); - onSuccess((Function)Functions.identity()); - } - - public JmxOperationPollConfig(JmxOperationPollConfig<T> other) { - super(other); - this.objectName = other.objectName; - this.operationName = other.operationName; - this.signature = other.signature != null ? ImmutableList.copyOf(other.signature) : null; - this.params = other.params != null ? ImmutableList.copyOf(other.params) : null; - } - - public ObjectName getObjectName() { - return objectName; - } - - public String getOperationName() { - return operationName; - } - - public List<String> getSignature() { - return signature; - } - - public List<?> getParams() { - return params; - } - - public JmxOperationPollConfig<T> objectName(ObjectName val) { - this.objectName = val; return this; - } - - public JmxOperationPollConfig<T> objectName(String val) { - try { - return objectName(new ObjectName(val)); - } catch (MalformedObjectNameException e) { - throw new IllegalArgumentException("Invalid object name ("+val+")", e); - } - } - - public JmxOperationPollConfig<T> operationName(String val) { - this.operationName = val; return this; - } - - public JmxOperationPollConfig<T> operationSignature(List<String> val) { - this.signature = val; return this; - } - - public JmxOperationPollConfig<T> operationParams(List<?> val) { - this.params = val; return this; - } - - public List<?> buildOperationIdentity() { - // FIXME Have a build() method for ensuring signature is set, and making class subsequently immutable? - return ImmutableList.of(operationName, buildSignature(), params); - } - - private List<String> buildSignature() { - if (signature != null && signature.size() == params.size()) { - return signature; - } else { - List<String> derivedSignature = Lists.newLinkedList(); - for (Object param : params) { - Class<?> clazz = (param != null) ? param.getClass() : null; - String clazzName = (clazz != null) ? - (JmxHelper.CLASSES.containsKey(clazz.getSimpleName()) ? - JmxHelper.CLASSES.get(clazz.getSimpleName()) : clazz.getName()) : - Object.class.getName(); - derivedSignature.add(clazzName); - } - return derivedSignature; - } - } - - @Override protected String toStringBaseName() { return "jmx"; } - @Override protected String toStringPollSource() { return objectName+":"+operationName+(params!=null ? params : "[]"); } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/event/feed/jmx/JmxValueFunctions.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxValueFunctions.java b/software/base/src/main/java/brooklyn/event/feed/jmx/JmxValueFunctions.java deleted file mode 100644 index 0b61b42..0000000 --- a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxValueFunctions.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.event.feed.jmx; - -import java.util.List; -import java.util.Map; - -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.TabularData; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Function; -import com.google.common.collect.Maps; - -public class JmxValueFunctions { - - private static final Logger log = LoggerFactory.getLogger(JmxValueFunctions.class); - - /** - * @return a closure that converts a TabularDataSupport to a map. - */ - public static Function<TabularData, Map> tabularDataToMap() { - return new Function<TabularData, Map>() { - @Override public Map apply(TabularData input) { - return tabularDataToMap(input); - }}; - } - - public static Function<TabularData, Map> tabularDataToMapOfMaps() { - return new Function<TabularData, Map>() { - @Override public Map apply(TabularData input) { - return tabularDataToMapOfMaps(input); - }}; - } - - public static Function<CompositeData,Map> compositeDataToMap() { - return new Function<CompositeData, Map>() { - @Override public Map apply(CompositeData input) { - return compositeDataToMap(input); - }}; - } - - public static Map tabularDataToMap(TabularData table) { - Map<String, Object> result = Maps.newLinkedHashMap(); - for (Object entry : table.values()) { - CompositeData data = (CompositeData) entry; //.getValue() - for (String key : data.getCompositeType().keySet()) { - Object old = result.put(key, data.get(key)); - if (old != null) { - log.warn("tablularDataToMap has overwritten key {}", key); - } - } - } - return result; - } - - public static Map<List<?>, Map<String, Object>> tabularDataToMapOfMaps(TabularData table) { - Map<List<?>, Map<String, Object>> result = Maps.newLinkedHashMap(); - for (Object k : table.keySet()) { - final Object[] kValues = ((List<?>)k).toArray(); - CompositeData v = (CompositeData) table.get(kValues); - result.put((List<?>)k, compositeDataToMap(v)); - } - return result; - } - - public static Map<String, Object> compositeDataToMap(CompositeData data) { - Map<String, Object> result = Maps.newLinkedHashMap(); - for (String key : data.getCompositeType().keySet()) { - Object old = result.put(key, data.get(key)); - if (old != null) { - log.warn("compositeDataToMap has overwritten key {}", key); - } - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynCluster.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynCluster.java b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynCluster.java new file mode 100644 index 0000000..daf7369 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynCluster.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.brooklynnode; + +import java.util.Map; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.entity.ImplementedBy; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.effector.core.Effectors; +import org.apache.brooklyn.entity.brooklynnode.effector.BrooklynNodeUpgradeEffectorBody; +import org.apache.brooklyn.entity.group.DynamicCluster; +import org.apache.brooklyn.sensor.core.Sensors; + +@ImplementedBy(BrooklynClusterImpl.class) +public interface BrooklynCluster extends DynamicCluster { + + ConfigKey<EntitySpec<?>> MEMBER_SPEC = ConfigKeys.newConfigKeyWithDefault(DynamicCluster.MEMBER_SPEC, + EntitySpec.create(BrooklynNode.class)); + + AttributeSensor<BrooklynNode> MASTER_NODE = Sensors.newSensor( + BrooklynNode.class, "brooklyncluster.master", "Pointer to the child node with MASTER state in the cluster"); + + interface SelectMasterEffector { + ConfigKey<String> NEW_MASTER_ID = ConfigKeys.newStringConfigKey( + "brooklyncluster.new_master_id", "The ID of the node to become master", null); + Effector<Void> SELECT_MASTER = Effectors.effector(Void.class, "selectMaster") + .description("Select a new master in the cluster") + .parameter(NEW_MASTER_ID) + .buildAbstract(); + } + + Effector<Void> SELECT_MASTER = SelectMasterEffector.SELECT_MASTER; + + interface UpgradeClusterEffector { + ConfigKey<String> DOWNLOAD_URL = BrooklynNode.DOWNLOAD_URL.getConfigKey(); + ConfigKey<Map<String,Object>> EXTRA_CONFIG = BrooklynNodeUpgradeEffectorBody.EXTRA_CONFIG; + + Effector<Void> UPGRADE_CLUSTER = Effectors.effector(Void.class, "upgradeCluster") + .description("Upgrade the cluster with new distribution version, " + + "by provisioning new nodes with the new version, failing over, " + + "and then deprovisioning the original nodes") + .parameter(BrooklynNode.SUGGESTED_VERSION) + .parameter(DOWNLOAD_URL) + .parameter(EXTRA_CONFIG) + .buildAbstract(); + } + + Effector<Void> UPGRADE_CLUSTER = UpgradeClusterEffector.UPGRADE_CLUSTER; + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java new file mode 100644 index 0000000..61b0ba5 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.brooklynnode; + +import java.util.Collection; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState; +import org.apache.brooklyn.entity.brooklynnode.effector.BrooklynClusterUpgradeEffectorBody; +import org.apache.brooklyn.entity.brooklynnode.effector.SelectMasterEffectorBody; +import org.apache.brooklyn.entity.core.EntityFunctions; +import org.apache.brooklyn.entity.core.EntityPredicates; +import org.apache.brooklyn.entity.group.DynamicClusterImpl; +import org.apache.brooklyn.entity.lifecycle.ServiceStateLogic; +import org.apache.brooklyn.entity.lifecycle.ServiceStateLogic.ServiceProblemsLogic; +import org.apache.brooklyn.sensor.enricher.Enrichers; +import org.apache.brooklyn.sensor.feed.function.FunctionFeed; +import org.apache.brooklyn.sensor.feed.function.FunctionPollConfig; +import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterables; + +public class BrooklynClusterImpl extends DynamicClusterImpl implements BrooklynCluster { + + private static final String MSG_NO_MASTER = "No master node in cluster"; + private static final String MSG_TOO_MANY_MASTERS = "Too many master nodes in cluster"; + + private static final Logger LOG = LoggerFactory.getLogger(BrooklynClusterImpl.class); + + // TODO should we set a default MEMBER_SPEC ? difficult though because we'd need to set a password + + @Override + public void init() { + super.init(); + getMutableEntityType().addEffector(SelectMasterEffectorBody.SELECT_MASTER); + getMutableEntityType().addEffector(BrooklynClusterUpgradeEffectorBody.UPGRADE_CLUSTER); + + ServiceProblemsLogic.updateProblemsIndicator(this, MASTER_NODE, MSG_NO_MASTER); + addFeed(FunctionFeed.builder() + .entity(this) + .poll(new FunctionPollConfig<Object, BrooklynNode>(MASTER_NODE) + .period(Duration.ONE_SECOND) + .callable(new MasterChildFinder())) + .build()); + + addEnricher( Enrichers.builder().transforming(MASTER_NODE) + .uniqueTag("master-node-web-uri") + .publishing(BrooklynNode.WEB_CONSOLE_URI) + .computing(EntityFunctions.attribute(BrooklynNode.WEB_CONSOLE_URI)) + .build() ); + } + + private final class MasterChildFinder implements Callable<BrooklynNode> { + @Override + public BrooklynNode call() throws Exception { + return findMasterChild(); + } + } + + BrooklynNode findMasterChild() { + Collection<Entity> masters = FluentIterable.from(getMembers()) + .filter(EntityPredicates.attributeEqualTo(BrooklynNode.MANAGEMENT_NODE_STATE, ManagementNodeState.MASTER)) + .toList(); + + if (masters.size() == 0) { + ServiceProblemsLogic.updateProblemsIndicator(this, MASTER_NODE, MSG_NO_MASTER); + return null; + + } else if (masters.size() == 1) { + ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(this, MASTER_NODE); + return (BrooklynNode)Iterables.getOnlyElement(masters); + + } else if (masters.size() == 2) { + LOG.warn("Two masters detected, probably a handover just occured: " + masters); + + //Don't clearProblemsIndicator - if there were no masters previously why have two now. + //But also don't set it. Probably hit a window where we have a new master + //its BrooklynNode picked it up, but the BrooklynNode + //for the old master hasn't refreshed its state yet. + //Just pick one of them, should sort itself out in next update. + + //TODO Do set such indicator if this continues for an extended period of time + + return (BrooklynNode)masters.iterator().next(); + + } else { + ServiceProblemsLogic.updateProblemsIndicator(this, MASTER_NODE, MSG_TOO_MANY_MASTERS); + String msg = "Multiple (>=3) master nodes in cluster: " + masters; + LOG.error(msg); + throw new IllegalStateException(msg); + + } + } + +}
