Repository: samza Updated Branches: refs/heads/master fde243475 -> 4072c9e85
SAMZA-1324 : Add a metricsreporter lifecycle for JobCoordinator component of StreamProcessor Added a metrics class for ZK based job coordinator that reports a few metrics. Author: PawasChhokra <Jaimatadi1$> Author: Pawas Chhokra <[email protected]> Author: PawasChhokra <[email protected]> Reviewers: Navina Ramesh <[email protected]> Closes #223 from PawasChhokra/ZkJobCoordinatorMetrics Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4072c9e8 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4072c9e8 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4072c9e8 Branch: refs/heads/master Commit: 4072c9e85c86212372eb527509ffa9c3eba97315 Parents: fde2434 Author: PawasChhokra <[email protected]> Authored: Mon Jun 26 16:35:26 2017 -0700 Committer: navina <[email protected]> Committed: Mon Jun 26 16:35:26 2017 -0700 ---------------------------------------------------------------------- .../versioned/container/metrics-table.html | 45 +++++++++++ .../apache/samza/container/LocalityManager.java | 1 + .../samza/zk/ZkBarrierForVersionUpgrade.java | 10 +-- .../org/apache/samza/zk/ZkJobCoordinator.java | 46 +++++++++-- .../samza/zk/ZkJobCoordinatorFactory.java | 4 +- .../samza/zk/ZkJobCoordinatorMetrics.java | 83 ++++++++++++++++++++ .../main/java/org/apache/samza/zk/ZkUtils.java | 30 +++++++ .../apache/samza/container/SamzaContainer.scala | 2 + .../samza/container/SamzaContainerMetrics.scala | 1 + .../zk/TestZkBarrierForVersionUpgrade.java | 5 +- .../apache/samza/zk/TestZkLeaderElector.java | 3 +- .../java/org/apache/samza/zk/TestZkUtils.java | 5 +- 12 files changed, 218 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/docs/learn/documentation/versioned/container/metrics-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/container/metrics-table.html b/docs/learn/documentation/versioned/container/metrics-table.html index 2eb46e3..7fbbc40 100644 --- a/docs/learn/documentation/versioned/container/metrics-table.html +++ b/docs/learn/documentation/versioned/container/metrics-table.html @@ -142,6 +142,7 @@ <li><a href="#bootstrapping-chooser-metrics">BootstrappingChooserMetrics</a></li> <li><a href="#hdfs-system-producer-metrics">HdfsSystemProducerMetrics</a></li> <li><a href="#elasticsearch-system-producer-metrics">ElasticsearchSystemProducerMetrics</a></li> + <li><a href="#zookeeper-job-coordinator-metrics">ZkJobCoordinatorMetrics</a></li> </ul> <p>Words highlighted like <span class="system">this</span> are placeholders for your own variable names defined in configuration file or system variables defined while starting the job.</p> <p id="average-time" style="color: #00a">Note: Average time is calculated for the current time window (set to 300 seconds)</p> @@ -215,6 +216,10 @@ <td>physical-memory-mb</td> <td>The physical memory used by the Samza container process (native + on heap) (in megabytes)</td> </tr> + <tr> + <td>container-startup-time</td> + <td><a href="#average-time">Average time</a> spent for the container to startup</td> + </tr> <tr> <th colspan="2" class="section" id="job-coordinator">job-coordinator</th> @@ -887,6 +892,46 @@ <td><span class="system">system</span>-version-conflicts</td> <td>Number of times the request could not be completed due to a conflict with the current state of the document</td> </tr> + + <tr> + <th colspan="2" class="section" id="zookeeper-job-coordinator-metrics">org.apache.samza.zk.ZkJobCoordinatorMetrics</th> + </tr> + <tr> + <td>reads</td> + <td>Number of reads from Zookeeper</td> + </tr> + <tr> + <td>writes</td> + <td>Number of writes to Zookeeper</td> + </tr> + <tr> + <td>subscriptions</td> + <td>Number of subscriptions to znodes in Zookeeper</td> + </tr> + <tr> + <td>zk-connection-error</td> + <td>Number of Zookeeper connection errors</td> + </tr> + <tr> + <td>is-leader</td> + <td>Denotes if the processor is a leader or not</td> + </tr> + <tr> + <td>barrier-creation</td> + <td>Number of times a barrier was created by the leader</td> + </tr> + <tr> + <td>barrier-state-change</td> + <td>Number of times the barrier state changed</td> + </tr> + <tr> + <td>barrier-error</td> + <td>Number of times the barrier encountered an error while attaining consensus on the job model version</td> + </tr> + <tr> + <td>single-barrier-rebalancing-time</td> + <td><a href="#average-time">Average time</a> taken for all the processors to get the latest version of the job model after single processor change (without the occurence of a barrier timeout)</td> + </tr> </tbody> </table> </body> http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java index 22380d3..bafebcc 100644 --- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java @@ -54,6 +54,7 @@ public class LocalityManager extends AbstractCoordinatorStreamManager { this.taskAssignmentManager = new TaskAssignmentManager(coordinatorStreamProducer, coordinatorStreamConsumer); } + /** * Special constructor that creates a write-only {@link LocalityManager} that only writes * to coordinator stream in {@link SamzaContainer} http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java index 581387d..c1343b1 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java @@ -93,7 +93,7 @@ public class ZkBarrierForVersionUpgrade { // subscribe for participant's list changes LOG.info("Subscribing for child changes at " + barrierParticipantsPath); - zkUtils.getZkClient().subscribeChildChanges(barrierParticipantsPath, new ZkBarrierChangeHandler(version, participants)); + zkUtils.subscribeChildChanges(barrierParticipantsPath, new ZkBarrierChangeHandler(version, participants)); barrierListenerOptional.ifPresent(zkBarrierListener -> zkBarrierListener.onBarrierCreated(version)); } @@ -106,7 +106,7 @@ public class ZkBarrierForVersionUpgrade { */ public void join(String version, String participantId) { String barrierDonePath = keyBuilder.getBarrierStatePath(version); - zkUtils.getZkClient().subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, version)); + zkUtils.subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, version)); // TODO: Handle ZkNodeExistsException - SAMZA-1304 zkUtils.getZkClient().createPersistent( @@ -119,7 +119,7 @@ public class ZkBarrierForVersionUpgrade { * @param version Version associated with the Barrier */ public void expire(String version) { - zkUtils.getZkClient().writeData( + zkUtils.writeData( keyBuilder.getBarrierStatePath(version), State.TIMED_OUT); @@ -150,8 +150,8 @@ public class ZkBarrierForVersionUpgrade { if (currentChildren.size() == names.size() && CollectionUtils.containsAll(currentChildren, names)) { String barrierDonePath = keyBuilder.getBarrierStatePath(barrierVersion); LOG.info("Writing BARRIER DONE to " + barrierDonePath); - zkUtils.getZkClient().writeData(barrierDonePath, State.DONE); // this will trigger notifications - zkUtils.getZkClient().unsubscribeChildChanges(barrierDonePath, this); + zkUtils.writeData(barrierDonePath, State.DONE); // this will trigger notifications + zkUtils.unsubscribeChildChanges(barrierDonePath, this); } } } http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index cb32252..f2fc3de 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -21,11 +21,13 @@ package org.apache.samza.zk; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MetricsConfig; import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorListener; @@ -33,9 +35,13 @@ import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.coordinator.LeaderElector; import org.apache.samza.coordinator.LeaderElectorListener; import org.apache.samza.job.model.JobModel; +import org.apache.samza.metrics.MetricsReporter; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.ReadableMetricsRegistry; import org.apache.samza.runtime.ProcessorIdGenerator; import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.util.ClassLoaderHelper; +import org.apache.samza.util.MetricsReporterLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,32 +54,33 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { // with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197 private static final int METADATA_CACHE_TTL_MS = 5000; - private final ZkUtils zkUtils; private final String processorId; private final ZkController zkController; private final Config config; private final ZkBarrierForVersionUpgrade barrier; + private final ZkJobCoordinatorMetrics metrics; + private final Map<String, MetricsReporter> reporters; private StreamMetadataCache streamMetadataCache = null; private ScheduleAfterDebounceTime debounceTimer = null; private JobCoordinatorListener coordinatorListener = null; private JobModel newJobModel; - private int debounceTimeMs; - public ZkJobCoordinator(Config config) { + public ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry) { this.config = config; ZkConfig zkConfig = new ZkConfig(config); ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId()); + this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry); this.zkUtils = new ZkUtils( keyBuilder, ZkCoordinationServiceFactory.createZkClient( zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()), - zkConfig.getZkConnectionTimeoutMs()); + zkConfig.getZkConnectionTimeoutMs(), metrics); this.processorId = createProcessorId(config); LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils); @@ -84,11 +91,13 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { zkUtils, new ZkBarrierListenerImpl()); this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs(); + this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId); } @Override public void start() { + startMetrics(); streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config); debounceTimer = new ScheduleAfterDebounceTime(throwable -> { @@ -104,15 +113,30 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { if (coordinatorListener != null) { coordinatorListener.onJobModelExpired(); } - + //Setting the isLeader metric to false when the stream processor shuts down because it does not remain the leader anymore + metrics.isLeader.set(false); debounceTimer.stopScheduler(); zkController.stop(); + shutdownMetrics(); if (coordinatorListener != null) { coordinatorListener.onCoordinatorStop(); } } + private void startMetrics() { + for (MetricsReporter reporter: reporters.values()) { + reporter.register("job-coordinator-" + processorId, (ReadableMetricsRegistry) metrics.getMetricsRegistry()); + reporter.start(); + } + } + + private void shutdownMetrics() { + for (MetricsReporter reporter: reporters.values()) { + reporter.stop(); + } + } + @Override public void setListener(JobCoordinatorListener listener) { this.coordinatorListener = listener; @@ -143,7 +167,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { // Generate the JobModel JobModel jobModel = generateNewJobModel(currentProcessorIds); - // Assign the next version of JobModel String currentJMVersion = zkUtils.getJobModelVersion(); String nextJMVersion; @@ -171,6 +194,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () -> { LOG.info("pid=" + processorId + "new JobModel available"); + // stop current work if (coordinatorListener != null) { coordinatorListener.onJobModelExpired(); @@ -236,6 +260,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { @Override public void onBecomingLeader() { LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader!"); + metrics.isLeader.set(true); zkController.subscribeToProcessorChange(); debounceTimer.scheduleAfterDebounceTime( ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, @@ -248,8 +273,14 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { class ZkBarrierListenerImpl implements ZkBarrierListener { private final String barrierAction = "BarrierAction"; + private long startTime = 0; + @Override public void onBarrierCreated(String version) { + // Start the timer for rebalancing + startTime = System.nanoTime(); + + metrics.barrierCreation.inc(); debounceTimer.scheduleAfterDebounceTime( barrierAction, (new ZkConfig(config)).getZkBarrierTimeoutMs(), @@ -259,6 +290,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { public void onBarrierStateChanged(final String version, ZkBarrierForVersionUpgrade.State state) { LOG.info("JobModel version " + version + " obtained consensus successfully!"); + metrics.barrierStateChange.inc(); + metrics.singleBarrierRebalancingTime.update(System.nanoTime() - startTime); if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) { debounceTimer.scheduleAfterDebounceTime( barrierAction, @@ -278,6 +311,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { @Override public void onBarrierError(String version, Throwable t) { LOG.error("Encountered error while attaining consensus on JobModel version " + version); + metrics.barrierError.inc(); stop(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java index d2e0d14..c077f94 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -22,6 +22,8 @@ package org.apache.samza.zk; import org.apache.samza.config.Config; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; +import org.apache.samza.metrics.MetricsRegistryMap; + public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { /** @@ -32,6 +34,6 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { */ @Override public JobCoordinator getJobCoordinator(Config config) { - return new ZkJobCoordinator(config); + return new ZkJobCoordinator(config, new MetricsRegistryMap()); } } http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java new file mode 100644 index 0000000..3437602 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.zk; + + +import org.apache.samza.metrics.MetricsBase; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.Timer; + + +public class ZkJobCoordinatorMetrics extends MetricsBase { + + private final MetricsRegistry metricsRegistry; + + public final Counter reads; + public final Counter writes; + public final Counter subscriptions; + public final Counter zkConnectionError; + + /** + * Denotes if the processor is a leader or not + */ + public final Gauge<Boolean> isLeader; + + /** + * Number of times a barrier was created by the leader + */ + public final Counter barrierCreation; + + /** + * Number of times the barrier state changed + */ + public final Counter barrierStateChange; + + /** + * Number of times the barrier encountered an error while attaining consensus on the job model version + */ + public final Counter barrierError; + + /** + * Average time taken for all the processors to get the latest version of the job model after single + * processor change (without the occurence of a barrier timeout) + */ + public final Timer singleBarrierRebalancingTime; + + public ZkJobCoordinatorMetrics(MetricsRegistry metricsRegistry) { + super(metricsRegistry); + this.metricsRegistry = metricsRegistry; + this.reads = newCounter("reads"); + this.writes = newCounter("writes"); + this.subscriptions = newCounter("subscriptions"); + this.zkConnectionError = newCounter("zk-connection-error"); + this.isLeader = newGauge("is-leader", false); + this.barrierCreation = newCounter("barrier-creation"); + this.barrierStateChange = newCounter("barrier-state-change"); + this.barrierError = newCounter("barrier-error"); + this.singleBarrierRebalancingTime = newTimer("single-barrier-rebalancing-time"); + } + + public MetricsRegistry getMetricsRegistry() { + return this.metricsRegistry; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 677ce54..6560cb4 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -65,6 +65,7 @@ public class ZkUtils { private volatile String ephemeralPath = null; private final ZkKeyBuilder keyBuilder; private final int connectionTimeoutMs; + private ZkJobCoordinatorMetrics metrics; public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) { this.keyBuilder = zkKeyBuilder; @@ -72,9 +73,17 @@ public class ZkUtils { this.zkClient = zkClient; } + public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, ZkJobCoordinatorMetrics metrics) { + this.keyBuilder = zkKeyBuilder; + this.connectionTimeoutMs = connectionTimeoutMs; + this.zkClient = zkClient; + this.metrics = metrics; + } + public void connect() throws ZkInterruptedException { boolean isConnected = zkClient.waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS); if (!isConnected) { + metrics.zkConnectionError.inc(); throw new RuntimeException("Unable to connect to Zookeeper within connectionTimeout " + connectionTimeoutMs + "ms. Shutting down!"); } } @@ -135,6 +144,7 @@ public class ZkUtils { */ String readProcessorData(String fullPath) { String data = zkClient.<String>readData(fullPath, true); + metrics.reads.inc(); if (data == null) { throw new SamzaException(String.format("Cannot read ZK node:", fullPath)); } @@ -177,6 +187,21 @@ public class ZkUtils { public void subscribeDataChanges(String path, IZkDataListener dataListener) { zkClient.subscribeDataChanges(path, dataListener); + metrics.subscriptions.inc(); + } + + public void subscribeChildChanges(String path, IZkChildListener listener) { + zkClient.subscribeChildChanges(path, listener); + metrics.subscriptions.inc(); + } + + public void unsubscribeChildChanges(String path, IZkChildListener childListener) { + zkClient.unsubscribeChildChanges(path, childListener); + } + + public void writeData(String path, Object object) { + zkClient.writeData(path, object); + metrics.writes.inc(); } public boolean exists(String path) { @@ -194,6 +219,7 @@ public class ZkUtils { public void subscribeToJobModelVersionChange(IZkDataListener dataListener) { LOG.info(" subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath()); zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener); + metrics.subscriptions.inc(); } /** @@ -224,6 +250,7 @@ public class ZkUtils { public JobModel getJobModel(String jobModelVersion) { LOG.info("read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion)); Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion)); + metrics.reads.inc(); ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper(); JobModel jm; try { @@ -250,6 +277,7 @@ public class ZkUtils { public void publishJobModelVersion(String oldVersion, String newVersion) { Stat stat = new Stat(); String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat); + metrics.reads.inc(); LOG.info("publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat .getVersion() + ")"); @@ -261,6 +289,7 @@ public class ZkUtils { int dataVersion = stat.getVersion(); try { stat = zkClient.writeDataReturnStat(keyBuilder.getJobModelVersionPath(), newVersion, dataVersion); + metrics.writes.inc(); } catch (Exception e) { String msg = "publish job model version failed for new version = " + newVersion + "; old version = " + oldVersion; LOG.error(msg, e); @@ -290,5 +319,6 @@ public class ZkUtils { public void subscribeToProcessorChange(IZkChildListener listener) { LOG.info("subscribing for child change at:" + keyBuilder.getProcessorsPath()); zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener); + metrics.subscriptions.inc(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 4f5df94..3bf5c95 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -668,6 +668,7 @@ class SamzaContainer( try { info("Starting container.") + val startTime = System.nanoTime() status = SamzaContainerStatus.STARTING jmxServer = new JmxServer() @@ -689,6 +690,7 @@ class SamzaContainer( if (containerListener != null) { containerListener.onContainerStart() } + metrics.containerStartupTime.update(System.nanoTime() - startTime) runLoop.run } catch { case e: Throwable => http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala index 18664d8..d080939 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala @@ -37,6 +37,7 @@ class SamzaContainerMetrics( val processNs = newTimer("process-ns") val commitNs = newTimer("commit-ns") val blockNs = newTimer("block-ns") + val containerStartupTime = newTimer("container-startup-time") val utilization = newGauge("event-loop-utilization", 0.0F) val diskUsageBytes = newGauge("disk-usage-bytes", 0L) val diskQuotaBytes = newGauge("disk-quota-bytes", Long.MaxValue) http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java index 63d6663..49cd280 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java @@ -22,6 +22,7 @@ import junit.framework.Assert; import org.I0Itec.zkclient.ZkClient; import org.apache.samza.config.ZkConfig; import org.apache.samza.testUtils.EmbeddedZookeeper; +import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -52,9 +53,9 @@ public class TestZkBarrierForVersionUpgrade { @Before public void testSetup() { ZkClient zkClient = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS); - this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS); + this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry())); ZkClient zkClient1 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS); - this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS); + this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry())); } @After http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java index 393d733..993297b 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -28,6 +28,7 @@ import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.samza.SamzaException; import org.apache.samza.testUtils.EmbeddedZookeeper; +import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -436,6 +437,6 @@ public class TestZkLeaderElector { return new ZkUtils( KEY_BUILDER, zkClient, - CONNECTION_TIMEOUT_MS); + CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry())); } } http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index b7a0eb8..9e33484 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -31,6 +31,7 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.testUtils.EmbeddedZookeeper; +import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -70,7 +71,7 @@ public class TestZkUtils { zkUtils = new ZkUtils( KEY_BUILDER, zkClient, - SESSION_TIMEOUT_MS); + SESSION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry())); zkUtils.connect(); } @@ -116,7 +117,7 @@ public class TestZkUtils { Assert.assertEquals(" ID1 didn't match", "1", l.get(0)); Assert.assertEquals(" ID2 didn't match", "2", l.get(1)); } - + @Test public void testSubscribeToJobModelVersionChange() {
