Repository: samza
Updated Branches:
  refs/heads/master f249e71a2 -> a706adda8


SAMZA-1742: Add metrics reporter parameter to LocalApplicationRunner 
constructor.

vjagadish1989 this has already been reviewed and approved by you and 
cameronlee314 internally. Please approve here. Thanks!

Author: Daniel Nishimura <[email protected]>

Reviewers: Jagadish <[email protected]>

Closes #550 from dnishimura/samza-1742-localapplicationrunner-custom-metrics


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a706adda
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a706adda
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a706adda

Branch: refs/heads/master
Commit: a706adda8767a057a81b316bced5ef3422def635
Parents: f249e71
Author: Daniel Nishimura <[email protected]>
Authored: Fri Jun 8 14:02:53 2018 -0700
Committer: Jagadish <[email protected]>
Committed: Fri Jun 8 14:02:53 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/runtime/LocalApplicationRunner.java  | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a706adda/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index d64e57a..d3df741 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -42,6 +43,7 @@ import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.DistributedLockWithState;
 import org.apache.samza.execution.ExecutionPlan;
 import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.processor.StreamProcessor;
 import org.apache.samza.processor.StreamProcessorLifecycleListener;
@@ -65,6 +67,7 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
   private final CountDownLatch shutdownLatch = new CountDownLatch(1);
   private final AtomicInteger numProcessorsToStart = new AtomicInteger();
   private final AtomicReference<Throwable> failure = new AtomicReference<>();
+  private final Map<String, MetricsReporter> customMetricsReporters;
 
   private ApplicationStatus appStatus = ApplicationStatus.New;
 
@@ -124,8 +127,13 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
   }
 
   public LocalApplicationRunner(Config config) {
+    this(config, new HashMap<>());
+  }
+
+  public LocalApplicationRunner(Config config, Map<String, MetricsReporter> 
customMetricsReporters) {
     super(config);
-    uid = UUID.randomUUID().toString();
+    this.uid = UUID.randomUUID().toString();
+    this.customMetricsReporters = customMetricsReporters;
   }
 
   @Override
@@ -313,10 +321,10 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
   private StreamProcessor getStreamProcessorInstance(Config config, Object 
taskFactory, StreamProcessorLifecycleListener listener) {
     if (taskFactory instanceof StreamTaskFactory) {
       return new StreamProcessor(
-          config, new HashMap<>(), (StreamTaskFactory) taskFactory, listener);
+          config, customMetricsReporters, (StreamTaskFactory) taskFactory, 
listener);
     } else if (taskFactory instanceof AsyncStreamTaskFactory) {
       return new StreamProcessor(
-          config, new HashMap<>(), (AsyncStreamTaskFactory) taskFactory, 
listener);
+          config, customMetricsReporters, (AsyncStreamTaskFactory) 
taskFactory, listener);
     } else {
       throw new SamzaException(String.format("%s is not a valid task factory",
           taskFactory.getClass().getCanonicalName()));

Reply via email to