Fix port check and sensor name and use RMI agent for JMX in zookeeper

Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/a24e0e4b
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/a24e0e4b
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/a24e0e4b

Branch: refs/heads/0.5.0
Commit: a24e0e4b8e4b41ac7458778ed0c2532cfbfbf53d
Parents: ea32c5a
Author: Andrew Kennedy <[email protected]>
Authored: Thu Mar 21 01:50:47 2013 +0000
Committer: Andrew Kennedy <[email protected]>
Committed: Fri Apr 19 10:36:07 2013 +0100

----------------------------------------------------------------------
 .../entity/messaging/kafka/KafkaBroker.java     |  2 +-
 .../entity/messaging/kafka/KafkaBrokerImpl.java | 12 +----
 .../messaging/kafka/KafkaClusterImpl.java       |  4 +-
 .../messaging/kafka/KafkaZookeeperImpl.java     | 10 +----
 .../kafka/KafkaZookeeperSshDriver.java          | 46 ++++++++++++++++++--
 .../entity/messaging/qpid/QpidBrokerImpl.java   | 11 +----
 6 files changed, 50 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/a24e0e4b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
index 01fa424..33cb7e0 100644
--- 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
@@ -43,7 +43,7 @@ public interface KafkaBroker extends SoftwareProcess, 
MessageBroker, UsesJmx, Ka
             String.class, "kafka.config.server", "Server configuration 
template (in freemarker format)", 
"classpath://brooklyn/entity/messaging/kafka/server.properties");
 
     @SetFromFlag("zookeeper")
-    BasicConfigKey<KafkaZookeeper> ZOOKEEPER = new 
BasicConfigKey<KafkaZookeeper>(KafkaZookeeper.class, "Kafka zookeeper entity");
+    BasicConfigKey<KafkaZookeeper> ZOOKEEPER = new 
BasicConfigKey<KafkaZookeeper>(KafkaZookeeper.class, "kafka.broker.zookeeper", 
"Kafka zookeeper entity");
 
     AttributeSensor<Integer> BROKER_ID = new 
BasicAttributeSensor<Integer>(Integer.class, "kafka.broker.id", "Kafka unique 
broker ID");
 

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/a24e0e4b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
index ed9ae0c..4dd253e 100644
--- 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
@@ -76,7 +76,7 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl 
implements MessageBroke
     @Override
     public KafkaZookeeper getZookeeper() { return getConfig(ZOOKEEPER); }
 
-    public KafkaTopic createTopic(Map properties) {
+    public KafkaTopic createTopic(Map<?, ?> properties) {
         KafkaTopic result = new KafkaTopic(properties, this);
         Entities.manage(result);
         result.create();
@@ -84,18 +84,10 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl 
implements MessageBroke
     }
 
     @Override
-    public Class getDriverInterface() {
+    public Class<?> getDriverInterface() {
         return KafkaBrokerDriver.class;
     }
 
-    @Override
-    protected Collection<Integer> getRequiredOpenPorts() {
-        Set<Integer> ports = 
Sets.newLinkedHashSet(super.getRequiredOpenPorts());
-        ports.add(getAttribute(KAFKA_PORT));
-        log.debug("getRequiredOpenPorts detected expanded ports {} for {}", 
ports, this);
-        return ports;
-    }
-
     private ObjectName socketServerStatsMbean = 
JmxHelper.createObjectName("kafka:type=kafka.SocketServerStats");
     private volatile FunctionFeed functionFeed;
     private volatile JmxFeed jmxFeed;

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/a24e0e4b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
index 969a140..efc14fc 100644
--- 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
@@ -76,11 +76,11 @@ public class KafkaClusterImpl extends AbstractEntity 
implements KafkaCluster {
         if (zookeeper == null) {
             EntitySpec<KafkaZookeeper> zookeeperSpec = 
getAttribute(ZOOKEEPER_SPEC);
             if (zookeeperSpec == null) {
-                log.debug("creating controller using default spec for {}", 
this);
+                log.debug("creating zookeeper using default spec for {}", 
this);
                 zookeeperSpec = 
BasicEntitySpec.newInstance(KafkaZookeeper.class);
                 setAttribute(ZOOKEEPER_SPEC, zookeeperSpec);
             } else {
-                log.debug("creating controller using custom spec for {}", 
this);
+                log.debug("creating zookeeper using custom spec for {}", this);
             }
             zookeeper = 
getEntityManager().createEntity(WrappingEntitySpec.newInstance(zookeeperSpec).parent(this));
             if (Entities.isManaged(this)) Entities.manage(zookeeper);

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/a24e0e4b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
index 8d9d4f4..0554011 100644
--- 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
@@ -67,18 +67,10 @@ public class KafkaZookeeperImpl extends SoftwareProcessImpl 
implements KafkaZook
     public String getHostname() { return getAttribute(HOSTNAME); }
 
     @Override
-    public Class getDriverInterface() {
+    public Class<?> getDriverInterface() {
         return KafkaZookeeperDriver.class;
     }
 
-    @Override
-    protected Collection<Integer> getRequiredOpenPorts() {
-        Set<Integer> ports = 
Sets.newLinkedHashSet(super.getRequiredOpenPorts());
-        ports.add(getAttribute(ZOOKEEPER_PORT));
-        log.debug("getRequiredOpenPorts detected expanded ports {} for {}", 
ports, this);
-        return ports;
-    }
-
     private ObjectName zookeeperMbean = 
JmxHelper.createObjectName("org.apache.ZooKeeperService:name0=StandaloneServer_port-1");
     private volatile FunctionFeed functionFeed;
     private volatile JmxFeed jmxFeed;

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/a24e0e4b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
index bbaaa0d..c62cb0a 100644
--- 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
@@ -24,13 +24,17 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import brooklyn.BrooklynVersion;
 import brooklyn.entity.basic.lifecycle.CommonCommands;
 import brooklyn.entity.drivers.downloads.DownloadResolver;
 import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
 import brooklyn.location.basic.SshMachineLocation;
 import brooklyn.util.MutableMap;
 import brooklyn.util.NetworkUtils;
+import brooklyn.util.ResourceUtils;
+import brooklyn.util.jmx.jmxrmi.JmxRmiAgent;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
 public class KafkaZookeeperSshDriver extends JavaSoftwareProcessSshDriver 
implements KafkaZookeeperDriver {
@@ -83,8 +87,23 @@ public class KafkaZookeeperSshDriver extends 
JavaSoftwareProcessSshDriver implem
                 .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), 
getRunDir()))
                 .execute();
 
-        String serverConfig = 
entity.getConfig(KafkaZookeeper.ZOOKEEPER_CONFIG_TEMPLATE);
-        copyTemplate(serverConfig, "zookeeper.properties");
+        String zookeeperConfig = 
entity.getConfig(KafkaZookeeper.ZOOKEEPER_CONFIG_TEMPLATE);
+        copyTemplate(zookeeperConfig, "zookeeper.properties");
+
+        // Copy JMX agent Jar to server
+        getMachine().copyTo(new 
ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), 
getJmxRmiAgentJarDestinationFilePath());
+    }
+
+    public String getJmxRmiAgentJarBasename() {
+        return "brooklyn-jmxrmi-agent-" + BrooklynVersion.get() + ".jar";
+    }
+
+    public String getJmxRmiAgentJarUrl() {
+        return "classpath://" + getJmxRmiAgentJarBasename();
+    }
+
+    public String getJmxRmiAgentJarDestinationFilePath() {
+        return getRunDir() + "/" + getJmxRmiAgentJarBasename();
     }
 
     @Override
@@ -111,10 +130,31 @@ public class KafkaZookeeperSshDriver extends 
JavaSoftwareProcessSshDriver implem
     }
 
     @Override
+    protected Map<String, ?> getJmxJavaSystemProperties() {
+        return MutableMap.<String, Object> builder()
+                .put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort())
+                .put(JmxRmiAgent.RMI_REGISTRY_PORT_PROPERTY, 
getRmiServerPort())
+                .put("com.sun.management.jmxremote.ssl", false)
+                .put("com.sun.management.jmxremote.authenticate", false)
+                .put("java.rmi.server.hostname", getHostname())
+                .build();
+    }
+
+    @Override
+    protected List<String> getJmxJavaConfigOptions() {
+        return ImmutableList.of("-javaagent:" + 
getJmxRmiAgentJarDestinationFilePath());
+    }
+
+    /**
+     * Use RMI agent to provide JMX.
+     */
+    @Override
     public Map<String, String> getShellEnvironment() {
         Map<String, String> orig = super.getShellEnvironment();
+        String kafkaJmxOpts = orig.remove("JAVA_OPTS");
         return MutableMap.<String, String>builder()
-                .put("KAFKA_JMX_OPTS", orig.get("JAVA_OPTS"))
+                .putAll(orig)
+                .put("KAFKA_JMX_OPTS", kafkaJmxOpts)
                 .build();
     }
 

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/a24e0e4b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java
index 441e996..938e1e6 100644
--- 
a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java
@@ -99,18 +99,9 @@ public class QpidBrokerImpl extends JMSBroker<QpidQueue, 
QpidTopic> implements Q
     }
 
     @Override
-    protected Collection<Integer> getRequiredOpenPorts() {
-        Set<Integer> ports = 
Sets.newLinkedHashSet(super.getRequiredOpenPorts());
-        ports.add(getAttribute(AMQP_PORT));
-        ports.add(getAttribute(HTTP_MANAGEMENT_PORT));
-        log.debug("getRequiredOpenPorts detected expanded (qpid) ports {} for 
{}", ports, this);
-        return ports;
-    }
-
-    @Override
     protected void connectSensors() {
         String serverInfoMBeanName = 
"org.apache.qpid:type=ServerInformation,name=ServerInformation";
-        
+
         jmxFeed = JmxFeed.builder()
                 .entity(this)
                 .period(500, TimeUnit.MILLISECONDS)

Reply via email to