This is an automated email from the ASF dual-hosted git repository. cameronlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new e82fc46 SAMZA-2199: [Scala cleanup] Convert MetricsConfig to Java (#1040) e82fc46 is described below commit e82fc46e14a9498cc912e00e9e2ae31571c81ad8 Author: Cameron Lee <ca...@linkedin.com> AuthorDate: Tue Jun 11 16:24:44 2019 -0700 SAMZA-2199: [Scala cleanup] Convert MetricsConfig to Java (#1040) --- .../clustermanager/ContainerProcessManager.java | 2 +- .../org/apache/samza/config/MetricsConfig.java | 85 +++++++++++++++ .../apache/samza/processor/StreamProcessor.java | 2 +- .../apache/samza/runtime/ContainerLaunchUtil.java | 2 +- .../org/apache/samza/util/DiagnosticsUtil.java | 14 +-- .../apache/samza/util/MetricsReporterLoader.java | 21 ++-- .../org/apache/samza/config/MetricsConfig.scala | 84 --------------- .../apache/samza/container/SamzaContainer.scala | 6 +- .../scala/org/apache/samza/job/JobRunner.scala | 9 +- .../metrics/ContainerProcessManagerMetrics.scala | 2 +- .../reporter/MetricsSnapshotReporterFactory.scala | 13 ++- .../org/apache/samza/config/TestMetricsConfig.java | 116 +++++++++++++++++++++ .../samza/table/caching/TestCachingTable.java | 2 +- .../kv/BaseKeyValueStorageEngineFactory.scala | 6 +- .../samza/storage/kv/TestLocalTableRead.java | 2 +- .../samza/storage/kv/TestLocalTableWrite.java | 2 +- .../samza/job/yarn/SamzaAppMasterMetrics.scala | 6 +- 17 files changed, 248 insertions(+), 126 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index 9d2a0e4..f75e217 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -150,7 +150,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback if (diagnosticsManagerReporterPair.isPresent()) { diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey()); - metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS(), diagnosticsManagerReporterPair.get().getValue()); + metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS, diagnosticsManagerReporterPair.get().getValue()); } else { diagnosticsManager = Option.empty(); } diff --git a/samza-core/src/main/java/org/apache/samza/config/MetricsConfig.java b/samza-core/src/main/java/org/apache/samza/config/MetricsConfig.java new file mode 100644 index 0000000..44ab3d8 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/config/MetricsConfig.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.config; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +/** + * Helper class for accessing configs related to metrics. + */ +public class MetricsConfig extends MapConfig { + // metrics config constants + public static final String METRICS_REPORTERS = "metrics.reporters"; + public static final String METRICS_REPORTER_FACTORY = "metrics.reporter.%s.class"; + // This flag enables the common timer metrics, e.g. process_ns + public static final String METRICS_TIMER_ENABLED = "metrics.timer.enabled"; + // This flag enables more timer metrics, e.g. handle-message-ns in an operator, for debugging purpose + public static final String METRICS_TIMER_DEBUG_ENABLED = "metrics.timer.debug.enabled"; + + // The following configs are applicable only to {@link MetricsSnapshotReporter} + // added here only to maintain backwards compatibility of config + public static final String METRICS_SNAPSHOT_REPORTER_STREAM = "metrics.reporter.%s.stream"; + // unit for this config is seconds + public static final String METRICS_SNAPSHOT_REPORTER_INTERVAL = "metrics.reporter.%s.interval"; + static final int DEFAULT_METRICS_SNAPSHOT_REPORTER_INTERVAL = 60; + public static final String METRICS_SNAPSHOT_REPORTER_BLACKLIST = "metrics.reporter.%s.blacklist"; + public static final String METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS = "diagnosticsreporter"; + + public MetricsConfig(Config config) { + super(config); + } + + public Optional<String> getMetricsFactoryClass(String name) { + return Optional.ofNullable(get(String.format(METRICS_REPORTER_FACTORY, name))); + } + + public Optional<String> getMetricsSnapshotReporterStream(String name) { + return Optional.ofNullable(get(String.format(METRICS_SNAPSHOT_REPORTER_STREAM, name))); + } + + public int getMetricsSnapshotReporterInterval(String name) { + return getInt(String.format(METRICS_SNAPSHOT_REPORTER_INTERVAL, name), DEFAULT_METRICS_SNAPSHOT_REPORTER_INTERVAL); + } + + public Optional<String> getMetricsSnapshotReporterBlacklist(String name) { + return Optional.ofNullable(get(String.format(METRICS_SNAPSHOT_REPORTER_BLACKLIST, name))); + } + + public List<String> getMetricReporterNames() { + Optional<String> metricReporterNamesValue = Optional.ofNullable(get(METRICS_REPORTERS)); + if (!metricReporterNamesValue.isPresent() || metricReporterNamesValue.get().isEmpty()) { + return Collections.emptyList(); + } else { + return Stream.of(metricReporterNamesValue.get().split(",")).map(String::trim).collect(Collectors.toList()); + } + } + + public boolean getMetricsTimerEnabled() { + return getBoolean(METRICS_TIMER_ENABLED, true); + } + + public boolean getMetricsTimerDebugEnabled() { + return getBoolean(METRICS_TIMER_DEBUG_ENABLED, false); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 3ba200b..c90c8f4 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -345,7 +345,7 @@ public class StreamProcessor { Option<DiagnosticsManager> diagnosticsManager = Option.empty(); if (diagnosticsManagerReporterPair.isPresent()) { diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey()); - this.customMetricsReporter.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS(), diagnosticsManagerReporterPair.get().getValue()); + this.customMetricsReporter.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS, diagnosticsManagerReporterPair.get().getValue()); } return SamzaContainer.apply(processorId, jobModel, ScalaJavaUtil.toScalaMap(this.customMetricsReporter), diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java index ab3409f..e748dd2 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java @@ -103,7 +103,7 @@ public class ContainerLaunchUtil { Option<DiagnosticsManager> diagnosticsManager = Option.empty(); if (diagnosticsManagerReporterPair.isPresent()) { diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey()); - metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS(), diagnosticsManagerReporterPair.get().getValue()); + metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS, diagnosticsManagerReporterPair.get().getValue()); } SamzaContainer container = SamzaContainer$.MODULE$.apply( diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java index 0b680c4..c290e6c 100644 --- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java @@ -97,16 +97,16 @@ public class DiagnosticsUtil { if (new JobConfig(config).getDiagnosticsEnabled()) { // Diagnostic stream, producer, and reporter related parameters - String diagnosticsReporterName = MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS(); - Integer publishInterval = new MetricsConfig(config).getMetricsSnapshotReporterInterval(diagnosticsReporterName); + String diagnosticsReporterName = MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS; + MetricsConfig metricsConfig = new MetricsConfig(config); + int publishInterval = metricsConfig.getMetricsSnapshotReporterInterval(diagnosticsReporterName); String taskClassVersion = Util.getTaskClassVersion(config); String samzaVersion = Util.getSamzaVersion(); String hostName = Util.getLocalHost().getHostName(); - Option<String> blacklist = new MetricsConfig(config).getMetricsSnapshotReporterBlacklist(diagnosticsReporterName); - Option<String> diagnosticsReporterStreamName = new MetricsConfig(config).getMetricsSnapshotReporterStream(diagnosticsReporterName); + Optional<String> diagnosticsReporterStreamName = metricsConfig.getMetricsSnapshotReporterStream(diagnosticsReporterName); - if (diagnosticsReporterStreamName.isEmpty()) { - throw new ConfigException("Missing required config: " + String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM(), diagnosticsReporterName)); + if (!diagnosticsReporterStreamName.isPresent()) { + throw new ConfigException("Missing required config: " + String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM, diagnosticsReporterName)); } SystemStream diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsReporterStreamName.get()); @@ -122,6 +122,8 @@ public class DiagnosticsUtil { DiagnosticsManager diagnosticsManager = new DiagnosticsManager(jobName, jobId, containerId, execEnvContainerId.orElse(""), taskClassVersion, samzaVersion, hostName, diagnosticsSystemStream, systemProducer, Duration.ofMillis(new TaskConfig(config).getShutdownMs())); + Option<String> blacklist = ScalaJavaUtil.JavaOptionals$.MODULE$.toRichOptional( + metricsConfig.getMetricsSnapshotReporterBlacklist(diagnosticsReporterName)).toOption(); MetricsSnapshotReporter diagnosticsReporter = new MetricsSnapshotReporter(systemProducer, diagnosticsSystemStream, publishInterval, jobName, jobId, "samza-container-" + containerId, taskClassVersion, samzaVersion, hostName, new MetricsSnapshotSerdeV2(), diff --git a/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java b/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java index 72cef10..788b252 100644 --- a/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java +++ b/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java @@ -26,7 +26,7 @@ import org.apache.samza.SamzaException; import org.apache.samza.config.MetricsConfig; import org.apache.samza.metrics.MetricsReporter; import org.apache.samza.metrics.MetricsReporterFactory; -import scala.collection.JavaConverters; + /** * Helper class that instantiates the MetricsReporter. @@ -36,28 +36,29 @@ public class MetricsReporterLoader { private MetricsReporterLoader() { } - public static Map<String, MetricsReporter> getMetricsReporters(MetricsConfig config, String containerName, + public static Map<String, MetricsReporter> getMetricsReporters(MetricsConfig metricsConfig, String containerName, ClassLoader classLoader) { Map<String, MetricsReporter> metricsReporters = new HashMap<>(); - String diagnosticsReporterName = MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS(); + String diagnosticsReporterName = MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS; // Exclude creation of diagnostics-reporter, because it is created manually in SamzaContainer (to allow sharing of // sysProducer between reporter and diagnosticsManager - List<String> metricsReporterNames = JavaConverters.seqAsJavaListConverter(config.getMetricReporterNames()).asJava(). - stream().filter(reporterName -> !reporterName.equals(diagnosticsReporterName)).collect(Collectors.toList()); + List<String> metricsReporterNames = metricsConfig.getMetricReporterNames() + .stream() + .filter(reporterName -> !reporterName.equals(diagnosticsReporterName)) + .collect(Collectors.toList()); for (String metricsReporterName : metricsReporterNames) { - String metricsFactoryClassName = config.getMetricsFactoryClass(metricsReporterName).get(); - if (metricsFactoryClassName == null) { - throw new SamzaException(String.format("Metrics reporter %s missing .class config", metricsReporterName)); - } + String metricsFactoryClassName = metricsConfig.getMetricsFactoryClass(metricsReporterName) + .orElseThrow(() -> new SamzaException( + String.format("Metrics reporter %s missing .class config", metricsReporterName))); MetricsReporterFactory metricsReporterFactory = ReflectionUtil.getObj(classLoader, metricsFactoryClassName, MetricsReporterFactory.class); metricsReporters.put(metricsReporterName, metricsReporterFactory.getMetricsReporter(metricsReporterName, containerName, - config)); + metricsConfig)); } return metricsReporters; } diff --git a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala deleted file mode 100644 index d41f5fb..0000000 --- a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.config - - -import org.apache.samza.util.HighResolutionClock - - -object MetricsConfig { - // metrics config constants - val METRICS_REPORTERS = "metrics.reporters" - val METRICS_REPORTER_FACTORY = "metrics.reporter.%s.class" - // This flag enables the common timer metrics, e.g. process_ns - val METRICS_TIMER_ENABLED= "metrics.timer.enabled" - // This flag enables more timer metrics, e.g. handle-message-ns in an operator, for debugging purpose - val METRICS_TIMER_DEBUG_ENABLED= "metrics.timer.debug.enabled" - - // The following configs are applicable only to {@link MetricsSnapshotReporter} - // added here only to maintain backwards compatibility of config - val METRICS_SNAPSHOT_REPORTER_STREAM = "metrics.reporter.%s.stream" - val METRICS_SNAPSHOT_REPORTER_INTERVAL= "metrics.reporter.%s.interval" - val METRICS_SNAPSHOT_REPORTER_BLACKLIST = "metrics.reporter.%s.blacklist" - val METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS = "diagnosticsreporter" - - implicit def Config2Metrics(config: Config) = new MetricsConfig(config) -} - -class MetricsConfig(config: Config) extends ScalaMapConfig(config) { - def getMetricsReporters(): Option[String] = getOption(MetricsConfig.METRICS_REPORTERS) - - def getMetricsFactoryClass(name: String): Option[String] = getOption(MetricsConfig.METRICS_REPORTER_FACTORY format name) - - def getMetricsSnapshotReporterStream(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM format name) - - def getMetricsSnapshotReporterInterval(name: String): Int = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_INTERVAL format name).getOrElse("60").toInt - - def getMetricsSnapshotReporterBlacklist(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_BLACKLIST format name) - - /** - * Returns a list of all metrics names from the config file. Useful for - * getting individual metrics. - */ - def getMetricReporterNames() = { - getMetricsReporters match { - case Some(mr) => if (!"".equals(mr)) { - mr.split(",").map(name => name.trim).toList - } else { - List[String]() - } - case _ => List[String]() - } - } - - /** - * Returns the flag to turn on/off the timer metrics. - * @return Boolean flag to enable the timer metrics - */ - def getMetricsTimerEnabled: Boolean = getBoolean(MetricsConfig.METRICS_TIMER_ENABLED, true) - - /** - * Returns the flag to enable the debug timer metrics. These metrics - * are turned off by default for better performance. - * @return Boolean - */ - def getMetricsTimerDebugEnabled: Boolean = getBoolean(MetricsConfig.METRICS_TIMER_DEBUG_ENABLED, false) - -} \ No newline at end of file diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index b55f057..a90ab93 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -32,7 +32,6 @@ import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.samza.checkpoint.{CheckpointListener, OffsetManager, OffsetManagerMetrics} import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.config.MetricsConfig.Config2Metrics import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.config._ import org.apache.samza.container.disk.DiskSpaceMonitor.Listener @@ -157,7 +156,8 @@ object SamzaContainer extends Logging { val systemProducersMetrics = new SystemProducersMetrics(registry) val systemConsumersMetrics = new SystemConsumersMetrics(registry) val offsetManagerMetrics = new OffsetManagerMetrics(registry) - val clock = if (config.getMetricsTimerEnabled) { + val metricsConfig = new MetricsConfig(config) + val clock = if (metricsConfig.getMetricsTimerEnabled) { new HighResolutionClock { override def nanoTime(): Long = System.nanoTime() } @@ -410,7 +410,7 @@ object SamzaContainer extends Logging { info("Setting up metrics reporters.") val reporters = - MetricsReporterLoader.getMetricsReporters(config, containerName, classLoader).asScala.toMap ++ customReporters + MetricsReporterLoader.getMetricsReporters(metricsConfig, containerName, classLoader).asScala.toMap ++ customReporters info("Got metrics reporters: %s" format reporters.keys) diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index 1e489ed..9c86e91 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -29,6 +29,7 @@ import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.runtime.ApplicationRunnerMain.ApplicationRunnerCommandLine import org.apache.samza.runtime.ApplicationRunnerOperation import org.apache.samza.system.{StreamSpec, SystemAdmins} +import org.apache.samza.util.ScalaJavaUtil.JavaOptionals import org.apache.samza.util.{CoordinatorStreamUtil, Logging, StreamUtil, Util} import scala.collection.JavaConverters._ @@ -113,9 +114,11 @@ class JobRunner(config: Config) extends Logging { // if diagnostics is enabled, create diagnostics stream if it doesnt exist if (new JobConfig(config).getDiagnosticsEnabled) { val DIAGNOSTICS_STREAM_ID = "samza-diagnostics-stream-id" - val diagnosticsSystemStreamName = new MetricsConfig(config). - getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS). - getOrElse(throw new ConfigException("Missing required config: " + + val diagnosticsSystemStreamName = JavaOptionals.toRichOptional( + new MetricsConfig(config) + .getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)) + .toOption + .getOrElse(throw new ConfigException("Missing required config: " + String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM, MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS))) diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala index a21668b..7bde882 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala @@ -20,7 +20,7 @@ package org.apache.samza.metrics import org.apache.samza.clustermanager.SamzaApplicationState -import org.apache.samza.config.{ClusterManagerConfig, Config} +import org.apache.samza.config.{ClusterManagerConfig, Config, MetricsConfig} import org.apache.samza.util.Logging /** diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala index d45b27a..6d77e51 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala @@ -21,9 +21,8 @@ package org.apache.samza.metrics.reporter import org.apache.samza.util.{Logging, StreamUtil, Util} import org.apache.samza.SamzaException -import org.apache.samza.config.{Config, SerializerConfig, SystemConfig} +import org.apache.samza.config.{Config, MetricsConfig, SerializerConfig, SystemConfig} import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.config.MetricsConfig.Config2Metrics import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.metrics.MetricsReporter import org.apache.samza.metrics.MetricsReporterFactory @@ -43,8 +42,9 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging val jobId = config .getJobId - val metricsSystemStreamName = config - .getMetricsSnapshotReporterStream(name) + val metricsConfig = new MetricsConfig(config) + val metricsSystemStreamName = JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterStream(name)) + .toOption .getOrElse(throw new SamzaException("No metrics stream defined in config.")) val systemStream = StreamUtil.getSystemStreamFromNames(metricsSystemStreamName) @@ -83,12 +83,11 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging info("Got serde %s." format serde) - val pollingInterval: Int = config - .getMetricsSnapshotReporterInterval(name) + val pollingInterval: Int = metricsConfig.getMetricsSnapshotReporterInterval(name) info("Setting polling interval to %d" format pollingInterval) - val blacklist = config.getMetricsSnapshotReporterBlacklist(name) + val blacklist = JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterBlacklist(name)).toOption info("Setting blacklist to %s" format blacklist) val reporter = new MetricsSnapshotReporter( diff --git a/samza-core/src/test/java/org/apache/samza/config/TestMetricsConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestMetricsConfig.java new file mode 100644 index 0000000..f4e889c --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/config/TestMetricsConfig.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.config; + +import java.util.Collections; +import java.util.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + + +public class TestMetricsConfig { + @Test + public void testGetMetricsFactoryClass() { + String metricsReporterName = "metricReporterName"; + String metricsReporterValue = "metrics.reporter.class"; + Config config = new MapConfig( + ImmutableMap.of(String.format(MetricsConfig.METRICS_REPORTER_FACTORY, metricsReporterName), + metricsReporterValue)); + assertEquals(Optional.of(metricsReporterValue), + new MetricsConfig(config).getMetricsFactoryClass(metricsReporterName)); + + assertFalse(metricsReporterValue, + new MetricsConfig(new MapConfig()).getMetricsFactoryClass("someName").isPresent()); + } + + @Test + public void testGetMetricsSnapshotReporterStream() { + String metricsReporterName = "metricReporterName"; + String value = "reporter-stream"; + Config config = new MapConfig( + ImmutableMap.of(String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM, metricsReporterName), value)); + assertEquals(Optional.of(value), new MetricsConfig(config).getMetricsSnapshotReporterStream(metricsReporterName)); + + assertFalse(value, new MetricsConfig(new MapConfig()).getMetricsSnapshotReporterStream("someName").isPresent()); + } + + @Test + public void testGetMetricsSnapshotReporterInterval() { + String metricsReporterName = "metricReporterName"; + String value = "10"; + Config config = new MapConfig( + ImmutableMap.of(String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_INTERVAL, metricsReporterName), value)); + assertEquals(10, new MetricsConfig(config).getMetricsSnapshotReporterInterval(metricsReporterName)); + + assertEquals(MetricsConfig.DEFAULT_METRICS_SNAPSHOT_REPORTER_INTERVAL, + new MetricsConfig(new MapConfig()).getMetricsSnapshotReporterInterval("someName")); + } + + @Test + public void testGetMetricsSnapshotReporterBlacklist() { + String metricsReporterName = "metricReporterName"; + String value = "metric0|metric1"; + Config config = new MapConfig( + ImmutableMap.of(String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_BLACKLIST, metricsReporterName), value)); + assertEquals(Optional.of(value), + new MetricsConfig(config).getMetricsSnapshotReporterBlacklist(metricsReporterName)); + + assertFalse(value, new MetricsConfig(new MapConfig()).getMetricsSnapshotReporterBlacklist("someName").isPresent()); + } + + @Test + public void testGetMetricReporterNames() { + Config config = new MapConfig( + ImmutableMap.of(MetricsConfig.METRICS_REPORTERS, "reporter0.class, reporter1.class, reporter2.class ")); + assertEquals(ImmutableList.of("reporter0.class", "reporter1.class", "reporter2.class"), + new MetricsConfig(config).getMetricReporterNames()); + + Config configEmptyValue = new MapConfig(ImmutableMap.of(MetricsConfig.METRICS_REPORTERS, "")); + assertEquals(Collections.emptyList(), new MetricsConfig(configEmptyValue).getMetricReporterNames()); + + assertEquals(Collections.emptyList(), new MetricsConfig(new MapConfig()).getMetricReporterNames()); + } + + @Test + public void testGetMetricsTimerEnabled() { + Config config = new MapConfig(ImmutableMap.of(MetricsConfig.METRICS_TIMER_ENABLED, "true")); + assertTrue(new MetricsConfig(config).getMetricsTimerEnabled()); + + config = new MapConfig(ImmutableMap.of(MetricsConfig.METRICS_TIMER_ENABLED, "false")); + assertFalse(new MetricsConfig(config).getMetricsTimerEnabled()); + + assertTrue(new MetricsConfig(new MapConfig()).getMetricsTimerEnabled()); + } + + @Test + public void testGetMetricsTimerDebugEnabled() { + Config config = new MapConfig(ImmutableMap.of(MetricsConfig.METRICS_TIMER_DEBUG_ENABLED, "true")); + assertTrue(new MetricsConfig(config).getMetricsTimerDebugEnabled()); + + config = new MapConfig(ImmutableMap.of(MetricsConfig.METRICS_TIMER_DEBUG_ENABLED, "false")); + assertFalse(new MetricsConfig(config).getMetricsTimerDebugEnabled()); + + assertFalse(new MetricsConfig(new MapConfig()).getMetricsTimerDebugEnabled()); + } +} \ No newline at end of file diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java index d873c16..e436a06 100644 --- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java +++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java @@ -144,7 +144,7 @@ public class TestCachingTable { private void initTables(boolean isTimerMetricsDisabled, ReadWriteTable ... tables) { Map<String, String> config = new HashMap<>(); if (isTimerMetricsDisabled) { - config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false"); + config.put(MetricsConfig.METRICS_TIMER_ENABLED, "false"); } Context context = new MockContext(); doReturn(new MapConfig(config)).when(context.getJobContext()).getConfig(); diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala index fdb473a..b85ff1a 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala @@ -22,8 +22,7 @@ package org.apache.samza.storage.kv import java.io.File import org.apache.samza.SamzaException -import org.apache.samza.config.MetricsConfig.Config2Metrics -import org.apache.samza.config.StorageConfig +import org.apache.samza.config.{MetricsConfig, StorageConfig} import org.apache.samza.context.{ContainerContext, JobContext} import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.serializers.Serde @@ -148,7 +147,8 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] // create the storage engine and return // TODO: Decide if we should use raw bytes when restoring val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry) - val clock = if (jobContext.getConfig.getMetricsTimerEnabled) { + val metricsConfig = new MetricsConfig(jobContext.getConfig) + val clock = if (metricsConfig.getMetricsTimerEnabled) { new HighResolutionClock { override def nanoTime(): Long = System.nanoTime() } diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java index 0fd4539..3e4f50e 100644 --- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java +++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java @@ -137,7 +137,7 @@ public class TestLocalTableRead { private LocalTable createTable(boolean isTimerDisabled) { Map<String, String> config = new HashMap<>(); if (isTimerDisabled) { - config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false"); + config.put(MetricsConfig.METRICS_TIMER_ENABLED, "false"); } Context context = mock(Context.class); JobContext jobContext = mock(JobContext.class); diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java index 15cba1b..9bd46ba 100644 --- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java +++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java @@ -230,7 +230,7 @@ public class TestLocalTableWrite { private LocalTable createTable(boolean isTimerDisabled) { Map<String, String> config = new HashMap<>(); if (isTimerDisabled) { - config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false"); + config.put(MetricsConfig.METRICS_TIMER_ENABLED, "false"); } Context context = mock(Context.class); JobContext jobContext = mock(JobContext.class); diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala index c5830a7..72dd70f 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala @@ -20,8 +20,7 @@ package org.apache.samza.job.yarn import org.apache.samza.clustermanager.SamzaApplicationState -import org.apache.samza.config.Config -import org.apache.samza.config.MetricsConfig.Config2Metrics +import org.apache.samza.config.{Config, MetricsConfig} import org.apache.samza.util.Logging import org.apache.samza.util.MetricsReporterLoader import org.apache.samza.metrics.ReadableMetricsRegistry @@ -43,8 +42,9 @@ class SamzaAppMasterMetrics(val config: Config, val registry: ReadableMetricsRegistry, val classLoader: ClassLoader) extends MetricsHelper with Logging { + private val metricsConfig = new MetricsConfig(config) val reporters = - MetricsReporterLoader.getMetricsReporters(config, SamzaAppMasterMetrics.sourceName, classLoader).asScala + MetricsReporterLoader.getMetricsReporters(metricsConfig, SamzaAppMasterMetrics.sourceName, classLoader).asScala reporters.values.foreach(_.register(SamzaAppMasterMetrics.sourceName, registry)) def start() {