Fix port check and sensor name and use RMI agent for JMX in zookeeper
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/a24e0e4b Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/a24e0e4b Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/a24e0e4b Branch: refs/heads/0.5.0 Commit: a24e0e4b8e4b41ac7458778ed0c2532cfbfbf53d Parents: ea32c5a Author: Andrew Kennedy <[email protected]> Authored: Thu Mar 21 01:50:47 2013 +0000 Committer: Andrew Kennedy <[email protected]> Committed: Fri Apr 19 10:36:07 2013 +0100 ---------------------------------------------------------------------- .../entity/messaging/kafka/KafkaBroker.java | 2 +- .../entity/messaging/kafka/KafkaBrokerImpl.java | 12 +---- .../messaging/kafka/KafkaClusterImpl.java | 4 +- .../messaging/kafka/KafkaZookeeperImpl.java | 10 +---- .../kafka/KafkaZookeeperSshDriver.java | 46 ++++++++++++++++++-- .../entity/messaging/qpid/QpidBrokerImpl.java | 11 +---- 6 files changed, 50 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/a24e0e4b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java index 01fa424..33cb7e0 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java @@ -43,7 +43,7 @@ public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Ka String.class, "kafka.config.server", "Server configuration template (in freemarker format)", "classpath://brooklyn/entity/messaging/kafka/server.properties"); @SetFromFlag("zookeeper") - BasicConfigKey<KafkaZookeeper> ZOOKEEPER = new BasicConfigKey<KafkaZookeeper>(KafkaZookeeper.class, "Kafka zookeeper entity"); + BasicConfigKey<KafkaZookeeper> ZOOKEEPER = new BasicConfigKey<KafkaZookeeper>(KafkaZookeeper.class, "kafka.broker.zookeeper", "Kafka zookeeper entity"); AttributeSensor<Integer> BROKER_ID = new BasicAttributeSensor<Integer>(Integer.class, "kafka.broker.id", "Kafka unique broker ID"); http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/a24e0e4b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java index ed9ae0c..4dd253e 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java @@ -76,7 +76,7 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke @Override public KafkaZookeeper getZookeeper() { return getConfig(ZOOKEEPER); } - public KafkaTopic createTopic(Map properties) { + public KafkaTopic createTopic(Map<?, ?> properties) { KafkaTopic result = new KafkaTopic(properties, this); Entities.manage(result); result.create(); @@ -84,18 +84,10 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke } @Override - public Class getDriverInterface() { + public Class<?> getDriverInterface() { return KafkaBrokerDriver.class; } - @Override - protected Collection<Integer> getRequiredOpenPorts() { - Set<Integer> ports = Sets.newLinkedHashSet(super.getRequiredOpenPorts()); - ports.add(getAttribute(KAFKA_PORT)); - log.debug("getRequiredOpenPorts detected expanded ports {} for {}", ports, this); - return ports; - } - private ObjectName socketServerStatsMbean = JmxHelper.createObjectName("kafka:type=kafka.SocketServerStats"); private volatile FunctionFeed functionFeed; private volatile JmxFeed jmxFeed; http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/a24e0e4b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java index 969a140..efc14fc 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java @@ -76,11 +76,11 @@ public class KafkaClusterImpl extends AbstractEntity implements KafkaCluster { if (zookeeper == null) { EntitySpec<KafkaZookeeper> zookeeperSpec = getAttribute(ZOOKEEPER_SPEC); if (zookeeperSpec == null) { - log.debug("creating controller using default spec for {}", this); + log.debug("creating zookeeper using default spec for {}", this); zookeeperSpec = BasicEntitySpec.newInstance(KafkaZookeeper.class); setAttribute(ZOOKEEPER_SPEC, zookeeperSpec); } else { - log.debug("creating controller using custom spec for {}", this); + log.debug("creating zookeeper using custom spec for {}", this); } zookeeper = getEntityManager().createEntity(WrappingEntitySpec.newInstance(zookeeperSpec).parent(this)); if (Entities.isManaged(this)) Entities.manage(zookeeper); http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/a24e0e4b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java index 8d9d4f4..0554011 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java @@ -67,18 +67,10 @@ public class KafkaZookeeperImpl extends SoftwareProcessImpl implements KafkaZook public String getHostname() { return getAttribute(HOSTNAME); } @Override - public Class getDriverInterface() { + public Class<?> getDriverInterface() { return KafkaZookeeperDriver.class; } - @Override - protected Collection<Integer> getRequiredOpenPorts() { - Set<Integer> ports = Sets.newLinkedHashSet(super.getRequiredOpenPorts()); - ports.add(getAttribute(ZOOKEEPER_PORT)); - log.debug("getRequiredOpenPorts detected expanded ports {} for {}", ports, this); - return ports; - } - private ObjectName zookeeperMbean = JmxHelper.createObjectName("org.apache.ZooKeeperService:name0=StandaloneServer_port-1"); private volatile FunctionFeed functionFeed; private volatile JmxFeed jmxFeed; http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/a24e0e4b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java index bbaaa0d..c62cb0a 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java @@ -24,13 +24,17 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import brooklyn.BrooklynVersion; import brooklyn.entity.basic.lifecycle.CommonCommands; import brooklyn.entity.drivers.downloads.DownloadResolver; import brooklyn.entity.java.JavaSoftwareProcessSshDriver; import brooklyn.location.basic.SshMachineLocation; import brooklyn.util.MutableMap; import brooklyn.util.NetworkUtils; +import brooklyn.util.ResourceUtils; +import brooklyn.util.jmx.jmxrmi.JmxRmiAgent; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; public class KafkaZookeeperSshDriver extends JavaSoftwareProcessSshDriver implements KafkaZookeeperDriver { @@ -83,8 +87,23 @@ public class KafkaZookeeperSshDriver extends JavaSoftwareProcessSshDriver implem .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), getRunDir())) .execute(); - String serverConfig = entity.getConfig(KafkaZookeeper.ZOOKEEPER_CONFIG_TEMPLATE); - copyTemplate(serverConfig, "zookeeper.properties"); + String zookeeperConfig = entity.getConfig(KafkaZookeeper.ZOOKEEPER_CONFIG_TEMPLATE); + copyTemplate(zookeeperConfig, "zookeeper.properties"); + + // Copy JMX agent Jar to server + getMachine().copyTo(new ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), getJmxRmiAgentJarDestinationFilePath()); + } + + public String getJmxRmiAgentJarBasename() { + return "brooklyn-jmxrmi-agent-" + BrooklynVersion.get() + ".jar"; + } + + public String getJmxRmiAgentJarUrl() { + return "classpath://" + getJmxRmiAgentJarBasename(); + } + + public String getJmxRmiAgentJarDestinationFilePath() { + return getRunDir() + "/" + getJmxRmiAgentJarBasename(); } @Override @@ -111,10 +130,31 @@ public class KafkaZookeeperSshDriver extends JavaSoftwareProcessSshDriver implem } @Override + protected Map<String, ?> getJmxJavaSystemProperties() { + return MutableMap.<String, Object> builder() + .put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort()) + .put(JmxRmiAgent.RMI_REGISTRY_PORT_PROPERTY, getRmiServerPort()) + .put("com.sun.management.jmxremote.ssl", false) + .put("com.sun.management.jmxremote.authenticate", false) + .put("java.rmi.server.hostname", getHostname()) + .build(); + } + + @Override + protected List<String> getJmxJavaConfigOptions() { + return ImmutableList.of("-javaagent:" + getJmxRmiAgentJarDestinationFilePath()); + } + + /** + * Use RMI agent to provide JMX. + */ + @Override public Map<String, String> getShellEnvironment() { Map<String, String> orig = super.getShellEnvironment(); + String kafkaJmxOpts = orig.remove("JAVA_OPTS"); return MutableMap.<String, String>builder() - .put("KAFKA_JMX_OPTS", orig.get("JAVA_OPTS")) + .putAll(orig) + .put("KAFKA_JMX_OPTS", kafkaJmxOpts) .build(); } http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/a24e0e4b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java index 441e996..938e1e6 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java @@ -99,18 +99,9 @@ public class QpidBrokerImpl extends JMSBroker<QpidQueue, QpidTopic> implements Q } @Override - protected Collection<Integer> getRequiredOpenPorts() { - Set<Integer> ports = Sets.newLinkedHashSet(super.getRequiredOpenPorts()); - ports.add(getAttribute(AMQP_PORT)); - ports.add(getAttribute(HTTP_MANAGEMENT_PORT)); - log.debug("getRequiredOpenPorts detected expanded (qpid) ports {} for {}", ports, this); - return ports; - } - - @Override protected void connectSensors() { String serverInfoMBeanName = "org.apache.qpid:type=ServerInformation,name=ServerInformation"; - + jmxFeed = JmxFeed.builder() .entity(this) .period(500, TimeUnit.MILLISECONDS)
