Repository: samza Updated Branches: refs/heads/master 9709d9d46 -> 4f7bbb054
SAMZA-670: added JMX access information to the dashboard Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4f7bbb05 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4f7bbb05 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4f7bbb05 Branch: refs/heads/master Commit: 4f7bbb054ac7c5f6bb5528189d49f0b4942faa7e Parents: 9709d9d Author: József Márton Jung <[email protected]> Authored: Wed Jul 15 22:38:12 2015 -0700 Committer: Yan Fang <[email protected]> Committed: Wed Jul 15 22:38:12 2015 -0700 ---------------------------------------------------------------------- checkstyle/import-control.xml | 1 + .../apache/samza/container/LocalityManager.java | 34 ++++++++++++-------- .../stream/CoordinatorStreamMessage.java | 21 +++++++++--- .../org/apache/samza/job/model/JobModel.java | 34 +++++++++++++++++--- .../apache/samza/container/SamzaContainer.scala | 15 +++++---- .../samza/coordinator/JobCoordinator.scala | 17 ++-------- .../samza/job/local/ThreadJobFactory.scala | 4 +-- .../org/apache/samza/metrics/JmxServer.scala | 2 ++ .../samza/container/TestSamzaContainer.scala | 3 +- .../resources/scalate/WEB-INF/views/index.scaml | 11 +++++++ .../apache/samza/job/yarn/SamzaAppMaster.scala | 5 +++ .../samza/job/yarn/SamzaAppMasterState.scala | 8 ++--- 12 files changed, 106 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 3374f0c..de835c7 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -55,6 +55,7 @@ <allow class="org.apache.samza.Partition" /> <allow class="org.apache.samza.container.TaskName" /> <allow class="org.apache.samza.system.SystemStreamPartition" /> + <allow class="org.apache.samza.container.LocalityManager" /> </subpackage> </subpackage> http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java index e661e12..55c258f 100644 --- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java @@ -38,13 +38,13 @@ public class LocalityManager { private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer; private final CoordinatorStreamSystemProducer coordinatorStreamProducer; private static final String SOURCE = "SamzaContainer-"; - private Map<Integer, String> containerToHostMapping; + private Map<Integer, Map<String, String>> containerToHostMapping; public LocalityManager(CoordinatorStreamSystemProducer coordinatorStreamProducer, CoordinatorStreamSystemConsumer coordinatorStreamConsumer) { this.coordinatorStreamConsumer = coordinatorStreamConsumer; this.coordinatorStreamProducer = coordinatorStreamProducer; - this.containerToHostMapping = new HashMap<Integer, String>(); + this.containerToHostMapping = new HashMap<>(); } public void start() { @@ -65,26 +65,34 @@ public class LocalityManager { coordinatorStreamProducer.register(LocalityManager.SOURCE + sourceSuffix); } - public Map<Integer, String> readContainerLocality() { - Map<Integer, String> allMappings = new HashMap<Integer, String>(); + public Map<Integer, Map<String, String>> readContainerLocality() { + Map<Integer, Map<String, String>> allMappings = new HashMap<>(); for (CoordinatorStreamMessage message: coordinatorStreamConsumer.getBootstrappedStream(SetContainerHostMapping.TYPE)) { SetContainerHostMapping mapping = new SetContainerHostMapping(message); - allMappings.put(Integer.parseInt(mapping.getKey()), mapping.getHostLocality()); + Map<String, String> localityMappings = new HashMap<>(); + localityMappings.put(SetContainerHostMapping.IP_KEY, mapping.getHostLocality()); + localityMappings.put(SetContainerHostMapping.JMX_URL_KEY, mapping.getJmxUrl()); + localityMappings.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, mapping.getJmxTunnelingUrl()); + log.info(String.format("Read locality for container %s: %s", mapping.getKey(), localityMappings)); + allMappings.put(Integer.parseInt(mapping.getKey()), localityMappings); } containerToHostMapping = Collections.unmodifiableMap(allMappings); return allMappings; } - - public void writeContainerToHostMapping(Integer containerId, String hostHttpAddress) { - String existingMapping = containerToHostMapping.get(containerId); - if (existingMapping != null && !existingMapping.equals(hostHttpAddress)) { - log.info("Container {} moved from {} to {}", new Object[]{containerId, existingMapping, hostHttpAddress}); + public void writeContainerToHostMapping(Integer containerId, String hostHttpAddress, String jmxAddress, String jmxTunnelingAddress) { + Map<String, String> existingMappings = containerToHostMapping.get(containerId); + String existingIpMapping = existingMappings != null ? existingMappings.get(SetContainerHostMapping.IP_KEY) : null; + if (existingIpMapping != null && !existingIpMapping.equals(hostHttpAddress)) { + log.info("Container {} moved from {} to {}", new Object[]{containerId, existingIpMapping, hostHttpAddress}); } else { log.info("Container {} started at {}", containerId, hostHttpAddress); } - coordinatorStreamProducer.send(new SetContainerHostMapping(SOURCE + containerId, String.valueOf(containerId), hostHttpAddress)); - containerToHostMapping.put(containerId, hostHttpAddress); + coordinatorStreamProducer.send(new SetContainerHostMapping(SOURCE + containerId, String.valueOf(containerId), hostHttpAddress, jmxAddress, jmxTunnelingAddress)); + Map<String, String> mappings = new HashMap<>(); + mappings.put(SetContainerHostMapping.IP_KEY, hostHttpAddress); + mappings.put(SetContainerHostMapping.JMX_URL_KEY, jmxAddress); + mappings.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, jmxTunnelingAddress); + containerToHostMapping.put(containerId, mappings); } - } http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java index 6c1e488..6bd1bd3 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java @@ -485,24 +485,29 @@ public class CoordinatorStreamMessage { * Source: "SamzaContainer-$ContainerId" * MessageMap: * { - * ip: InetAddressString + * ip: InetAddressString, + * jmx-url: jmxAddressString + * jmx-tunneling-url: jmxTunnelingAddressString * } * } * */ public static class SetContainerHostMapping extends CoordinatorStreamMessage { public static final String TYPE = "set-container-host-assignment"; - private static final String IP_KEY = "ip"; + public static final String IP_KEY = "ip"; + public static final String JMX_URL_KEY = "jmx-url"; + public static final String JMX_TUNNELING_URL_KEY = "jmx-tunneling-url"; public SetContainerHostMapping(CoordinatorStreamMessage message) { super(message.getKeyArray(), message.getMessageMap()); } - public SetContainerHostMapping(String source, String key, String hostHttpAddress) { + public SetContainerHostMapping(String source, String key, String hostHttpAddress, String jmxAddress, String jmxTunnelingAddress) { super(source); setType(TYPE); setKey(key); putMessageValue(IP_KEY, hostHttpAddress); - + putMessageValue(JMX_URL_KEY, jmxAddress); + putMessageValue(JMX_TUNNELING_URL_KEY, jmxTunnelingAddress); } public String getHostLocality() { @@ -510,5 +515,13 @@ public class CoordinatorStreamMessage { } + public String getJmxUrl() { + return getMessageValue(JMX_URL_KEY); + } + + public String getJmxTunnelingUrl() { + return getMessageValue(JMX_TUNNELING_URL_KEY); + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java index 95a2ce5..ad6387d 100644 --- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java +++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java @@ -20,9 +20,9 @@ package org.apache.samza.job.model; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import org.apache.samza.config.Config; +import org.apache.samza.container.LocalityManager; /** * <p> @@ -37,20 +37,22 @@ import org.apache.samza.config.Config; * </p> */ public class JobModel { + private static final String EMPTY_STRING = ""; private final Config config; private final Map<Integer, ContainerModel> containers; - private final Map<Integer, String> containerToHostMapping; + + private final LocalityManager localityManager; public int maxChangeLogStreamPartitions; public JobModel(Config config, Map<Integer, ContainerModel> containers) { - this(config, containers, new HashMap<Integer, String>()); + this(config, containers, null); } - public JobModel(Config config, Map<Integer, ContainerModel> containers, Map<Integer, String> containerToHostMapping) { + public JobModel(Config config, Map<Integer, ContainerModel> containers, LocalityManager localityManager) { this.config = config; this.containers = Collections.unmodifiableMap(containers); - this.containerToHostMapping = Collections.unmodifiableMap(containerToHostMapping); + this.localityManager = localityManager; // Compute the number of change log stream partitions as the maximum partition-id // of all total number of tasks of the job; Increment by 1 because partition ids @@ -68,6 +70,28 @@ public class JobModel { return config; } + /** + * Returns the container to host mapping for a given container ID and mapping key + * + * @param containerId the ID of the container + * @param key mapping key which is one of the keys declared in {@link org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetContainerHostMapping} + * @return the value if it exists for a given container and key, otherwise an empty string + */ + public String getContainerToHostValue(Integer containerId, String key) { + if (localityManager == null) { + return EMPTY_STRING; + } + final Map<String, String> mappings = localityManager.readContainerLocality().get(containerId); + if (mappings == null) { + return EMPTY_STRING; + } + if (!mappings.containsKey(key)) { + return EMPTY_STRING; + } + return mappings.get(key); + } + + public Map<Integer, ContainerModel> getContainers() { return containers; } http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index cbacd18..27b2517 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -92,7 +92,7 @@ object SamzaContainer extends Logging { try { jmxServer = newJmxServer() - SamzaContainer(containerModel, jobModel).run + SamzaContainer(containerModel, jobModel, jmxServer).run } finally { if (jmxServer != null) { jmxServer.stop @@ -133,7 +133,7 @@ object SamzaContainer extends Logging { serde } - def apply(containerModel: ContainerModel, jobModel: JobModel) = { + def apply(containerModel: ContainerModel, jobModel: JobModel, jmxServer: JmxServer) = { val config = jobModel.getConfig val containerId = containerModel.getContainerId val containerName = "samza-container-%s" format containerId @@ -407,7 +407,6 @@ object SamzaContainer extends Logging { .values .map(_.getTaskName) .toSet - val containerContext = new SamzaContainerContext(containerId, config, taskNames) val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => { @@ -541,7 +540,8 @@ object SamzaContainer extends Logging { localityManager = localityManager, metrics = samzaContainerMetrics, reporters = reporters, - jvm = jvm) + jvm = jvm, + jmxServer = jmxServer) } } @@ -552,6 +552,7 @@ class SamzaContainer( consumerMultiplexer: SystemConsumers, producerMultiplexer: SystemProducers, metrics: SamzaContainerMetrics, + jmxServer: JmxServer, offsetManager: OffsetManager = new OffsetManager, localityManager: LocalityManager = null, reporters: Map[String, MetricsReporter] = Map(), @@ -625,10 +626,12 @@ class SamzaContainer( localityManager.start localityManager.register(String.valueOf(containerContext.id)) - info("Writing container locality to Coordinator Stream") + info("Writing container locality and JMX address to Coordinator Stream") try { val hostInetAddress = InetAddress.getLocalHost.getHostAddress - localityManager.writeContainerToHostMapping(containerContext.id, hostInetAddress) + val jmxUrl = if (jmxServer != null) jmxServer.getJmxUrl else "" + val jmxTunnelingUrl = if (jmxServer != null) jmxServer.getTunnelingJmxUrl else "" + localityManager.writeContainerToHostMapping(containerContext.id, hostInetAddress, jmxUrl, jmxTunnelingUrl) } catch { case uhe: UnknownHostException => warn("Received UnknownHostException when persisting locality info for container %d: %s" format (containerContext.id, uhe.getMessage)) //No-op http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala index 73c58a7..f621611 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala @@ -21,7 +21,7 @@ package org.apache.samza.coordinator import org.apache.samza.config.Config -import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel} +import org.apache.samza.job.model.{JobModel, TaskModel} import org.apache.samza.SamzaException import org.apache.samza.container.grouper.task.GroupByContainerCount import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory @@ -42,7 +42,7 @@ import org.apache.samza.coordinator.server.HttpServer import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager} import org.apache.samza.coordinator.server.JobServlet import org.apache.samza.config.SystemConfig.Config2System -import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamMessage, CoordinatorStreamSystemFactory} +import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory import org.apache.samza.config.ConfigRewriter /** @@ -67,8 +67,6 @@ object JobCoordinator extends Logging { coordinatorSystemConsumer.start debug("Bootstrapping coordinator system stream.") coordinatorSystemConsumer.bootstrap - debug("Stopping coordinator system stream.") - coordinatorSystemConsumer.stop val config = coordinatorSystemConsumer.getConfig info("Got config: %s" format config) val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator") @@ -276,16 +274,7 @@ object JobCoordinator extends Logging { val containerModels = containerGrouper.group(taskModels).map { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap - val containerLocality = if(localityManager != null) { - localityManager.readContainerLocality() - } else { - new util.HashMap[Integer, String]() - } - - containerLocality.foreach{case (container: Integer, location: String) => - info("Container id %d --> %s" format (container.intValue(), location)) - } - new JobModel(config, containerModels, containerLocality) + new JobModel(config, containerModels, localityManager) } } } http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index 3f2f70e..5acfe87 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -20,7 +20,7 @@ package org.apache.samza.job.local -import org.apache.samza.metrics.MetricsRegistryMap +import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap} import org.apache.samza.util.Logging import org.apache.samza.SamzaException import org.apache.samza.config.Config @@ -48,7 +48,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging { try { coordinator.start - new ThreadJob(SamzaContainer(containerModel, coordinator.jobModel)) + new ThreadJob(SamzaContainer(containerModel, coordinator.jobModel, new JmxServer)) } finally { coordinator.stop } http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala b/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala index f343faf..de45123 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala @@ -115,4 +115,6 @@ class JmxServer(requestedPort: Int) extends Logging { def stop = jmxServer.stop override def toString = "JmxServer registry port=%d server port=%d url=%s" format (getRegistryPort, getServerPort, getJmxUrl) + + def getTunnelingJmxUrl = getJmxUrl.replaceAll("localhost", hostname) } http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 9fb1aa9..84fdeaa 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -166,7 +166,8 @@ class TestSamzaContainer extends AssertionsForJUnit { runLoop = runLoop, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, - metrics = new SamzaContainerMetrics + metrics = new SamzaContainerMetrics, + jmxServer = null ) try { container.run http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml index cf0d2fc..41303f7 100644 --- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml +++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml @@ -89,6 +89,12 @@ %td.key Application master container %td %a(target="_blank" href="http://#{state.nodeHost}:#{state.nodeHttpPort.toString}/node/containerlogs/#{state.containerId.toString}/#{username}")= state.containerId.toString + %tr + %td.key JMX server url + %td= state.jmxUrl + %tr + %td.key JMX server tunneling url + %td= state.jmxTunnelingUrl %div.tab-pane#containers %h2 Containers @@ -116,6 +122,7 @@ %th Node %th Start Time %th Up Time + %th JMX access %tbody - for((containerId, container) <- state.runningContainers) %tr @@ -128,6 +135,10 @@ Start time: #{container.startTimeStr()} %td Up time: #{container.upTimeStr()} + %td + Ordinary: #{state.jobCoordinator.jobModel.getContainerToHostValue(containerId, org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetContainerHostMapping.JMX_URL_KEY)} + Tunneling: #{state.jobCoordinator.jobModel.getContainerToHostValue(containerId, org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetContainerHostMapping.JMX_TUNNELING_URL_KEY)} + %div.tab-pane#task-groups %h2 Task Groups http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala index 20aa373..af42c6a 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala @@ -91,6 +91,11 @@ object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler { try { // wire up all of the yarn event listeners val state = new SamzaAppMasterState(jobCoordinator, -1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt) + if (jmxServer.isDefined) { + state.jmxUrl = jmxServer.get.getJmxUrl + state.jmxTunnelingUrl = jmxServer.get.getTunnelingJmxUrl + } + val service = new SamzaAppMasterService(config, state, registry, clientHelper) val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, state, amClient) val metrics = new SamzaAppMasterMetrics(config, state, registry) http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala index 1445605..f667c83 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala @@ -21,11 +21,7 @@ package org.apache.samza.job.yarn import org.apache.samza.util.Logging import org.apache.hadoop.yarn.api.records.FinalApplicationStatus import org.apache.hadoop.yarn.api.records.ContainerId -import java.util -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.container.TaskName import java.net.URL -import org.apache.samza.job.model.JobModel import org.apache.samza.coordinator.JobCoordinator /** @@ -53,4 +49,8 @@ class SamzaAppMasterState(val jobCoordinator: JobCoordinator, val taskId: Int, v // controlled on startup var appAttemptId = containerId.getApplicationAttemptId + + // JMX address + var jmxUrl = "" + var jmxTunnelingUrl = "" }
