This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new e82fc46  SAMZA-2199: [Scala cleanup] Convert MetricsConfig to Java 
(#1040)
e82fc46 is described below

commit e82fc46e14a9498cc912e00e9e2ae31571c81ad8
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Tue Jun 11 16:24:44 2019 -0700

    SAMZA-2199: [Scala cleanup] Convert MetricsConfig to Java (#1040)
---
 .../clustermanager/ContainerProcessManager.java    |   2 +-
 .../org/apache/samza/config/MetricsConfig.java     |  85 +++++++++++++++
 .../apache/samza/processor/StreamProcessor.java    |   2 +-
 .../apache/samza/runtime/ContainerLaunchUtil.java  |   2 +-
 .../org/apache/samza/util/DiagnosticsUtil.java     |  14 +--
 .../apache/samza/util/MetricsReporterLoader.java   |  21 ++--
 .../org/apache/samza/config/MetricsConfig.scala    |  84 ---------------
 .../apache/samza/container/SamzaContainer.scala    |   6 +-
 .../scala/org/apache/samza/job/JobRunner.scala     |   9 +-
 .../metrics/ContainerProcessManagerMetrics.scala   |   2 +-
 .../reporter/MetricsSnapshotReporterFactory.scala  |  13 ++-
 .../org/apache/samza/config/TestMetricsConfig.java | 116 +++++++++++++++++++++
 .../samza/table/caching/TestCachingTable.java      |   2 +-
 .../kv/BaseKeyValueStorageEngineFactory.scala      |   6 +-
 .../samza/storage/kv/TestLocalTableRead.java       |   2 +-
 .../samza/storage/kv/TestLocalTableWrite.java      |   2 +-
 .../samza/job/yarn/SamzaAppMasterMetrics.scala     |   6 +-
 17 files changed, 248 insertions(+), 126 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 9d2a0e4..f75e217 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -150,7 +150,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
 
     if (diagnosticsManagerReporterPair.isPresent()) {
       diagnosticsManager = 
Option.apply(diagnosticsManagerReporterPair.get().getKey());
-      
metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS(),
 diagnosticsManagerReporterPair.get().getValue());
+      
metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS,
 diagnosticsManagerReporterPair.get().getValue());
     } else {
       diagnosticsManager = Option.empty();
     }
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/MetricsConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/MetricsConfig.java
new file mode 100644
index 0000000..44ab3d8
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/MetricsConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+
+/**
+ * Helper class for accessing configs related to metrics.
+ */
+public class MetricsConfig extends MapConfig {
+  // metrics config constants
+  public static final String METRICS_REPORTERS = "metrics.reporters";
+  public static final String METRICS_REPORTER_FACTORY = 
"metrics.reporter.%s.class";
+  // This flag enables the common timer metrics, e.g. process_ns
+  public static final String METRICS_TIMER_ENABLED = "metrics.timer.enabled";
+  // This flag enables more timer metrics, e.g. handle-message-ns in an 
operator, for debugging purpose
+  public static final String METRICS_TIMER_DEBUG_ENABLED = 
"metrics.timer.debug.enabled";
+
+  // The following configs are applicable only to {@link 
MetricsSnapshotReporter}
+  // added here only to maintain backwards compatibility of config
+  public static final String METRICS_SNAPSHOT_REPORTER_STREAM = 
"metrics.reporter.%s.stream";
+  // unit for this config is seconds
+  public static final String METRICS_SNAPSHOT_REPORTER_INTERVAL = 
"metrics.reporter.%s.interval";
+  static final int DEFAULT_METRICS_SNAPSHOT_REPORTER_INTERVAL = 60;
+  public static final String METRICS_SNAPSHOT_REPORTER_BLACKLIST = 
"metrics.reporter.%s.blacklist";
+  public static final String METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS = 
"diagnosticsreporter";
+
+  public MetricsConfig(Config config) {
+    super(config);
+  }
+
+  public Optional<String> getMetricsFactoryClass(String name) {
+    return Optional.ofNullable(get(String.format(METRICS_REPORTER_FACTORY, 
name)));
+  }
+
+  public Optional<String> getMetricsSnapshotReporterStream(String name) {
+    return 
Optional.ofNullable(get(String.format(METRICS_SNAPSHOT_REPORTER_STREAM, name)));
+  }
+
+  public int getMetricsSnapshotReporterInterval(String name) {
+    return getInt(String.format(METRICS_SNAPSHOT_REPORTER_INTERVAL, name), 
DEFAULT_METRICS_SNAPSHOT_REPORTER_INTERVAL);
+  }
+
+  public Optional<String> getMetricsSnapshotReporterBlacklist(String name) {
+    return 
Optional.ofNullable(get(String.format(METRICS_SNAPSHOT_REPORTER_BLACKLIST, 
name)));
+  }
+
+  public List<String> getMetricReporterNames() {
+    Optional<String> metricReporterNamesValue = 
Optional.ofNullable(get(METRICS_REPORTERS));
+    if (!metricReporterNamesValue.isPresent() || 
metricReporterNamesValue.get().isEmpty()) {
+      return Collections.emptyList();
+    } else {
+      return 
Stream.of(metricReporterNamesValue.get().split(",")).map(String::trim).collect(Collectors.toList());
+    }
+  }
+
+  public boolean getMetricsTimerEnabled() {
+    return getBoolean(METRICS_TIMER_ENABLED, true);
+  }
+
+  public boolean getMetricsTimerDebugEnabled() {
+    return getBoolean(METRICS_TIMER_DEBUG_ENABLED, false);
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 3ba200b..c90c8f4 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -345,7 +345,7 @@ public class StreamProcessor {
     Option<DiagnosticsManager> diagnosticsManager = Option.empty();
     if (diagnosticsManagerReporterPair.isPresent()) {
       diagnosticsManager = 
Option.apply(diagnosticsManagerReporterPair.get().getKey());
-      
this.customMetricsReporter.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS(),
 diagnosticsManagerReporterPair.get().getValue());
+      
this.customMetricsReporter.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS,
 diagnosticsManagerReporterPair.get().getValue());
     }
 
     return SamzaContainer.apply(processorId, jobModel, 
ScalaJavaUtil.toScalaMap(this.customMetricsReporter),
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java 
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index ab3409f..e748dd2 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -103,7 +103,7 @@ public class ContainerLaunchUtil {
       Option<DiagnosticsManager> diagnosticsManager = Option.empty();
       if (diagnosticsManagerReporterPair.isPresent()) {
         diagnosticsManager = 
Option.apply(diagnosticsManagerReporterPair.get().getKey());
-        
metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS(),
 diagnosticsManagerReporterPair.get().getValue());
+        
metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS,
 diagnosticsManagerReporterPair.get().getValue());
       }
 
       SamzaContainer container = SamzaContainer$.MODULE$.apply(
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java 
b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 0b680c4..c290e6c 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -97,16 +97,16 @@ public class DiagnosticsUtil {
     if (new JobConfig(config).getDiagnosticsEnabled()) {
 
       // Diagnostic stream, producer, and reporter related parameters
-      String diagnosticsReporterName = 
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS();
-      Integer publishInterval = new 
MetricsConfig(config).getMetricsSnapshotReporterInterval(diagnosticsReporterName);
+      String diagnosticsReporterName = 
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS;
+      MetricsConfig metricsConfig = new MetricsConfig(config);
+      int publishInterval = 
metricsConfig.getMetricsSnapshotReporterInterval(diagnosticsReporterName);
       String taskClassVersion = Util.getTaskClassVersion(config);
       String samzaVersion = Util.getSamzaVersion();
       String hostName = Util.getLocalHost().getHostName();
-      Option<String> blacklist = new 
MetricsConfig(config).getMetricsSnapshotReporterBlacklist(diagnosticsReporterName);
-      Option<String> diagnosticsReporterStreamName = new 
MetricsConfig(config).getMetricsSnapshotReporterStream(diagnosticsReporterName);
+      Optional<String> diagnosticsReporterStreamName = 
metricsConfig.getMetricsSnapshotReporterStream(diagnosticsReporterName);
 
-      if (diagnosticsReporterStreamName.isEmpty()) {
-        throw new ConfigException("Missing required config: " + 
String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM(), 
diagnosticsReporterName));
+      if (!diagnosticsReporterStreamName.isPresent()) {
+        throw new ConfigException("Missing required config: " + 
String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM, 
diagnosticsReporterName));
       }
 
       SystemStream diagnosticsSystemStream = 
StreamUtil.getSystemStreamFromNames(diagnosticsReporterStreamName.get());
@@ -122,6 +122,8 @@ public class DiagnosticsUtil {
       DiagnosticsManager diagnosticsManager = new DiagnosticsManager(jobName, 
jobId, containerId, execEnvContainerId.orElse(""), taskClassVersion,
           samzaVersion, hostName, diagnosticsSystemStream, systemProducer, 
Duration.ofMillis(new TaskConfig(config).getShutdownMs()));
 
+      Option<String> blacklist = 
ScalaJavaUtil.JavaOptionals$.MODULE$.toRichOptional(
+          
metricsConfig.getMetricsSnapshotReporterBlacklist(diagnosticsReporterName)).toOption();
       MetricsSnapshotReporter diagnosticsReporter =
           new MetricsSnapshotReporter(systemProducer, diagnosticsSystemStream, 
publishInterval, jobName, jobId,
               "samza-container-" + containerId, taskClassVersion, 
samzaVersion, hostName, new MetricsSnapshotSerdeV2(),
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java 
b/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
index 72cef10..788b252 100644
--- a/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
+++ b/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
@@ -26,7 +26,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.metrics.MetricsReporterFactory;
-import scala.collection.JavaConverters;
+
 
 /**
  * Helper class that instantiates the MetricsReporter.
@@ -36,28 +36,29 @@ public class MetricsReporterLoader {
   private MetricsReporterLoader() {
   }
 
-  public static Map<String, MetricsReporter> getMetricsReporters(MetricsConfig 
config, String containerName,
+  public static Map<String, MetricsReporter> getMetricsReporters(MetricsConfig 
metricsConfig, String containerName,
       ClassLoader classLoader) {
     Map<String, MetricsReporter> metricsReporters = new HashMap<>();
 
-    String diagnosticsReporterName = 
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS();
+    String diagnosticsReporterName = 
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS;
 
     // Exclude creation of diagnostics-reporter, because it is created 
manually in SamzaContainer (to allow sharing of
     // sysProducer between reporter and diagnosticsManager
-    List<String> metricsReporterNames = 
JavaConverters.seqAsJavaListConverter(config.getMetricReporterNames()).asJava().
-        stream().filter(reporterName -> 
!reporterName.equals(diagnosticsReporterName)).collect(Collectors.toList());
+    List<String> metricsReporterNames = metricsConfig.getMetricReporterNames()
+        .stream()
+        .filter(reporterName -> !reporterName.equals(diagnosticsReporterName))
+        .collect(Collectors.toList());
 
     for (String metricsReporterName : metricsReporterNames) {
-      String metricsFactoryClassName = 
config.getMetricsFactoryClass(metricsReporterName).get();
-      if (metricsFactoryClassName == null) {
-        throw new SamzaException(String.format("Metrics reporter %s missing 
.class config", metricsReporterName));
-      }
+      String metricsFactoryClassName = 
metricsConfig.getMetricsFactoryClass(metricsReporterName)
+          .orElseThrow(() -> new SamzaException(
+              String.format("Metrics reporter %s missing .class config", 
metricsReporterName)));
       MetricsReporterFactory metricsReporterFactory =
           ReflectionUtil.getObj(classLoader, metricsFactoryClassName, 
MetricsReporterFactory.class);
       metricsReporters.put(metricsReporterName,
                            
metricsReporterFactory.getMetricsReporter(metricsReporterName,
                                                                      
containerName,
-                                                                     config));
+                                                                     
metricsConfig));
     }
     return metricsReporters;
   }
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
deleted file mode 100644
index d41f5fb..0000000
--- a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config
-
-
-import org.apache.samza.util.HighResolutionClock
-
-
-object MetricsConfig {
-  // metrics config constants
-  val METRICS_REPORTERS = "metrics.reporters"
-  val METRICS_REPORTER_FACTORY = "metrics.reporter.%s.class"
-  // This flag enables the common timer metrics, e.g. process_ns
-  val METRICS_TIMER_ENABLED= "metrics.timer.enabled"
-  // This flag enables more timer metrics, e.g. handle-message-ns in an 
operator, for debugging purpose
-  val METRICS_TIMER_DEBUG_ENABLED= "metrics.timer.debug.enabled"
-
-  // The following configs are applicable only to {@link 
MetricsSnapshotReporter}
-  // added here only to maintain backwards compatibility of config
-  val METRICS_SNAPSHOT_REPORTER_STREAM = "metrics.reporter.%s.stream"
-  val METRICS_SNAPSHOT_REPORTER_INTERVAL= "metrics.reporter.%s.interval"
-  val METRICS_SNAPSHOT_REPORTER_BLACKLIST = "metrics.reporter.%s.blacklist"
-  val METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS = "diagnosticsreporter"
-
-  implicit def Config2Metrics(config: Config) = new MetricsConfig(config)
-}
-
-class MetricsConfig(config: Config) extends ScalaMapConfig(config) {
-  def getMetricsReporters(): Option[String] = 
getOption(MetricsConfig.METRICS_REPORTERS)
-
-  def getMetricsFactoryClass(name: String): Option[String] = 
getOption(MetricsConfig.METRICS_REPORTER_FACTORY format name)
-
-  def getMetricsSnapshotReporterStream(name: String): Option[String] = 
getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM format name)
-
-  def getMetricsSnapshotReporterInterval(name: String): Int = 
getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_INTERVAL format 
name).getOrElse("60").toInt
-
-  def getMetricsSnapshotReporterBlacklist(name: String): Option[String] = 
getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_BLACKLIST format name)
-
-  /**
-   * Returns a list of all metrics names from the config file. Useful for
-   * getting individual metrics.
-   */
-  def getMetricReporterNames() = {
-    getMetricsReporters match {
-      case Some(mr) => if (!"".equals(mr)) {
-        mr.split(",").map(name => name.trim).toList
-      } else {
-        List[String]()
-      }
-      case _ => List[String]()
-    }
-  }
-
-  /**
-   * Returns the flag to turn on/off the timer metrics.
-   * @return Boolean flag to enable the timer metrics
-   */
-  def getMetricsTimerEnabled: Boolean = 
getBoolean(MetricsConfig.METRICS_TIMER_ENABLED, true)
-
-  /**
-    * Returns the flag to enable the debug timer metrics. These metrics
-    * are turned off by default for better performance.
-    * @return Boolean
-    */
-  def getMetricsTimerDebugEnabled: Boolean = 
getBoolean(MetricsConfig.METRICS_TIMER_DEBUG_ENABLED, false)
-
-}
\ No newline at end of file
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index b55f057..a90ab93 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -32,7 +32,6 @@ import com.google.common.annotations.VisibleForTesting
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.samza.checkpoint.{CheckpointListener, OffsetManager, 
OffsetManagerMetrics}
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config._
 import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
@@ -157,7 +156,8 @@ object SamzaContainer extends Logging {
     val systemProducersMetrics = new SystemProducersMetrics(registry)
     val systemConsumersMetrics = new SystemConsumersMetrics(registry)
     val offsetManagerMetrics = new OffsetManagerMetrics(registry)
-    val clock = if (config.getMetricsTimerEnabled) {
+    val metricsConfig = new MetricsConfig(config)
+    val clock = if (metricsConfig.getMetricsTimerEnabled) {
       new HighResolutionClock {
         override def nanoTime(): Long = System.nanoTime()
       }
@@ -410,7 +410,7 @@ object SamzaContainer extends Logging {
     info("Setting up metrics reporters.")
 
     val reporters =
-      MetricsReporterLoader.getMetricsReporters(config, containerName, 
classLoader).asScala.toMap ++ customReporters
+      MetricsReporterLoader.getMetricsReporters(metricsConfig, containerName, 
classLoader).asScala.toMap ++ customReporters
 
     info("Got metrics reporters: %s" format reporters.keys)
 
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 1e489ed..9c86e91 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -29,6 +29,7 @@ import org.apache.samza.metrics.MetricsRegistryMap
 import 
org.apache.samza.runtime.ApplicationRunnerMain.ApplicationRunnerCommandLine
 import org.apache.samza.runtime.ApplicationRunnerOperation
 import org.apache.samza.system.{StreamSpec, SystemAdmins}
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.util.{CoordinatorStreamUtil, Logging, StreamUtil, Util}
 
 import scala.collection.JavaConverters._
@@ -113,9 +114,11 @@ class JobRunner(config: Config) extends Logging {
     // if diagnostics is enabled, create diagnostics stream if it doesnt exist
     if (new JobConfig(config).getDiagnosticsEnabled) {
       val DIAGNOSTICS_STREAM_ID = "samza-diagnostics-stream-id"
-      val diagnosticsSystemStreamName = new MetricsConfig(config).
-        
getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS).
-        getOrElse(throw new ConfigException("Missing required config: " +
+      val diagnosticsSystemStreamName = JavaOptionals.toRichOptional(
+        new MetricsConfig(config)
+          
.getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS))
+        .toOption
+        .getOrElse(throw new ConfigException("Missing required config: " +
           String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM,
             MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)))
 
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
index a21668b..7bde882 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
@@ -20,7 +20,7 @@
 package org.apache.samza.metrics
 
 import org.apache.samza.clustermanager.SamzaApplicationState
-import org.apache.samza.config.{ClusterManagerConfig, Config}
+import org.apache.samza.config.{ClusterManagerConfig, Config, MetricsConfig}
 import org.apache.samza.util.Logging
 
 /**
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
index d45b27a..6d77e51 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
@@ -21,9 +21,8 @@ package org.apache.samza.metrics.reporter
 
 import org.apache.samza.util.{Logging, StreamUtil, Util}
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, SerializerConfig, SystemConfig}
+import org.apache.samza.config.{Config, MetricsConfig, SerializerConfig, 
SystemConfig}
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.metrics.MetricsReporterFactory
@@ -43,8 +42,9 @@ class MetricsSnapshotReporterFactory extends 
MetricsReporterFactory with Logging
     val jobId = config
       .getJobId
 
-    val metricsSystemStreamName = config
-      .getMetricsSnapshotReporterStream(name)
+    val metricsConfig = new MetricsConfig(config)
+    val metricsSystemStreamName = 
JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterStream(name))
+      .toOption
       .getOrElse(throw new SamzaException("No metrics stream defined in 
config."))
 
     val systemStream = 
StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
@@ -83,12 +83,11 @@ class MetricsSnapshotReporterFactory extends 
MetricsReporterFactory with Logging
 
     info("Got serde %s." format serde)
 
-    val pollingInterval: Int = config
-      .getMetricsSnapshotReporterInterval(name)
+    val pollingInterval: Int = 
metricsConfig.getMetricsSnapshotReporterInterval(name)
 
     info("Setting polling interval to %d" format pollingInterval)
 
-    val blacklist = config.getMetricsSnapshotReporterBlacklist(name)
+    val blacklist = 
JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterBlacklist(name)).toOption
     info("Setting blacklist to %s" format blacklist)
 
     val reporter = new MetricsSnapshotReporter(
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestMetricsConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestMetricsConfig.java
new file mode 100644
index 0000000..f4e889c
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestMetricsConfig.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.Collections;
+import java.util.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestMetricsConfig {
+  @Test
+  public void testGetMetricsFactoryClass() {
+    String metricsReporterName = "metricReporterName";
+    String metricsReporterValue = "metrics.reporter.class";
+    Config config = new MapConfig(
+        ImmutableMap.of(String.format(MetricsConfig.METRICS_REPORTER_FACTORY, 
metricsReporterName),
+            metricsReporterValue));
+    assertEquals(Optional.of(metricsReporterValue),
+        new MetricsConfig(config).getMetricsFactoryClass(metricsReporterName));
+
+    assertFalse(metricsReporterValue,
+        new MetricsConfig(new 
MapConfig()).getMetricsFactoryClass("someName").isPresent());
+  }
+
+  @Test
+  public void testGetMetricsSnapshotReporterStream() {
+    String metricsReporterName = "metricReporterName";
+    String value = "reporter-stream";
+    Config config = new MapConfig(
+        
ImmutableMap.of(String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM, 
metricsReporterName), value));
+    assertEquals(Optional.of(value), new 
MetricsConfig(config).getMetricsSnapshotReporterStream(metricsReporterName));
+
+    assertFalse(value, new MetricsConfig(new 
MapConfig()).getMetricsSnapshotReporterStream("someName").isPresent());
+  }
+
+  @Test
+  public void testGetMetricsSnapshotReporterInterval() {
+    String metricsReporterName = "metricReporterName";
+    String value = "10";
+    Config config = new MapConfig(
+        
ImmutableMap.of(String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_INTERVAL, 
metricsReporterName), value));
+    assertEquals(10, new 
MetricsConfig(config).getMetricsSnapshotReporterInterval(metricsReporterName));
+
+    assertEquals(MetricsConfig.DEFAULT_METRICS_SNAPSHOT_REPORTER_INTERVAL,
+        new MetricsConfig(new 
MapConfig()).getMetricsSnapshotReporterInterval("someName"));
+  }
+
+  @Test
+  public void testGetMetricsSnapshotReporterBlacklist() {
+    String metricsReporterName = "metricReporterName";
+    String value = "metric0|metric1";
+    Config config = new MapConfig(
+        
ImmutableMap.of(String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_BLACKLIST,
 metricsReporterName), value));
+    assertEquals(Optional.of(value),
+        new 
MetricsConfig(config).getMetricsSnapshotReporterBlacklist(metricsReporterName));
+
+    assertFalse(value, new MetricsConfig(new 
MapConfig()).getMetricsSnapshotReporterBlacklist("someName").isPresent());
+  }
+
+  @Test
+  public void testGetMetricReporterNames() {
+    Config config = new MapConfig(
+        ImmutableMap.of(MetricsConfig.METRICS_REPORTERS, "reporter0.class, 
reporter1.class, reporter2.class "));
+    assertEquals(ImmutableList.of("reporter0.class", "reporter1.class", 
"reporter2.class"),
+        new MetricsConfig(config).getMetricReporterNames());
+
+    Config configEmptyValue = new 
MapConfig(ImmutableMap.of(MetricsConfig.METRICS_REPORTERS, ""));
+    assertEquals(Collections.emptyList(), new 
MetricsConfig(configEmptyValue).getMetricReporterNames());
+
+    assertEquals(Collections.emptyList(), new MetricsConfig(new 
MapConfig()).getMetricReporterNames());
+  }
+
+  @Test
+  public void testGetMetricsTimerEnabled() {
+    Config config = new 
MapConfig(ImmutableMap.of(MetricsConfig.METRICS_TIMER_ENABLED, "true"));
+    assertTrue(new MetricsConfig(config).getMetricsTimerEnabled());
+
+    config = new 
MapConfig(ImmutableMap.of(MetricsConfig.METRICS_TIMER_ENABLED, "false"));
+    assertFalse(new MetricsConfig(config).getMetricsTimerEnabled());
+
+    assertTrue(new MetricsConfig(new MapConfig()).getMetricsTimerEnabled());
+  }
+
+  @Test
+  public void testGetMetricsTimerDebugEnabled() {
+    Config config = new 
MapConfig(ImmutableMap.of(MetricsConfig.METRICS_TIMER_DEBUG_ENABLED, "true"));
+    assertTrue(new MetricsConfig(config).getMetricsTimerDebugEnabled());
+
+    config = new 
MapConfig(ImmutableMap.of(MetricsConfig.METRICS_TIMER_DEBUG_ENABLED, "false"));
+    assertFalse(new MetricsConfig(config).getMetricsTimerDebugEnabled());
+
+    assertFalse(new MetricsConfig(new 
MapConfig()).getMetricsTimerDebugEnabled());
+  }
+}
\ No newline at end of file
diff --git 
a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java 
b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index d873c16..e436a06 100644
--- 
a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ 
b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -144,7 +144,7 @@ public class TestCachingTable {
   private void initTables(boolean isTimerMetricsDisabled, ReadWriteTable ... 
tables) {
     Map<String, String> config = new HashMap<>();
     if (isTimerMetricsDisabled) {
-      config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+      config.put(MetricsConfig.METRICS_TIMER_ENABLED, "false");
     }
     Context context = new MockContext();
     doReturn(new MapConfig(config)).when(context.getJobContext()).getConfig();
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
index fdb473a..b85ff1a 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
@@ -22,8 +22,7 @@ package org.apache.samza.storage.kv
 import java.io.File
 
 import org.apache.samza.SamzaException
-import org.apache.samza.config.MetricsConfig.Config2Metrics
-import org.apache.samza.config.StorageConfig
+import org.apache.samza.config.{MetricsConfig, StorageConfig}
 import org.apache.samza.context.{ContainerContext, JobContext}
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.serializers.Serde
@@ -148,7 +147,8 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends 
StorageEngineFactory[K, V]
     // create the storage engine and return
     // TODO: Decide if we should use raw bytes when restoring
     val keyValueStorageEngineMetrics = new 
KeyValueStorageEngineMetrics(storeName, registry)
-    val clock = if (jobContext.getConfig.getMetricsTimerEnabled) {
+    val metricsConfig = new MetricsConfig(jobContext.getConfig)
+    val clock = if (metricsConfig.getMetricsTimerEnabled) {
       new HighResolutionClock {
         override def nanoTime(): Long = System.nanoTime()
       }
diff --git 
a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java
index 0fd4539..3e4f50e 100644
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java
@@ -137,7 +137,7 @@ public class TestLocalTableRead {
   private LocalTable createTable(boolean isTimerDisabled) {
     Map<String, String> config = new HashMap<>();
     if (isTimerDisabled) {
-      config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+      config.put(MetricsConfig.METRICS_TIMER_ENABLED, "false");
     }
     Context context = mock(Context.class);
     JobContext jobContext = mock(JobContext.class);
diff --git 
a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java
index 15cba1b..9bd46ba 100644
--- 
a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java
+++ 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java
@@ -230,7 +230,7 @@ public class TestLocalTableWrite {
   private LocalTable createTable(boolean isTimerDisabled) {
     Map<String, String> config = new HashMap<>();
     if (isTimerDisabled) {
-      config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+      config.put(MetricsConfig.METRICS_TIMER_ENABLED, "false");
     }
     Context context = mock(Context.class);
     JobContext jobContext = mock(JobContext.class);
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
index c5830a7..72dd70f 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -20,8 +20,7 @@
 package org.apache.samza.job.yarn
 
 import org.apache.samza.clustermanager.SamzaApplicationState
-import org.apache.samza.config.Config
-import org.apache.samza.config.MetricsConfig.Config2Metrics
+import org.apache.samza.config.{Config, MetricsConfig}
 import org.apache.samza.util.Logging
 import org.apache.samza.util.MetricsReporterLoader
 import org.apache.samza.metrics.ReadableMetricsRegistry
@@ -43,8 +42,9 @@ class SamzaAppMasterMetrics(val config: Config,
   val registry: ReadableMetricsRegistry,
   val classLoader: ClassLoader) extends MetricsHelper with Logging {
 
+  private val metricsConfig = new MetricsConfig(config)
   val reporters =
-    MetricsReporterLoader.getMetricsReporters(config, 
SamzaAppMasterMetrics.sourceName, classLoader).asScala
+    MetricsReporterLoader.getMetricsReporters(metricsConfig, 
SamzaAppMasterMetrics.sourceName, classLoader).asScala
   reporters.values.foreach(_.register(SamzaAppMasterMetrics.sourceName, 
registry))
 
   def start() {

Reply via email to