Repository: samza Updated Branches: refs/heads/master f249e71a2 -> a706adda8
SAMZA-1742: Add metrics reporter parameter to LocalApplicationRunner constructor. vjagadish1989 this has already been reviewed and approved by you and cameronlee314 internally. Please approve here. Thanks! Author: Daniel Nishimura <[email protected]> Reviewers: Jagadish <[email protected]> Closes #550 from dnishimura/samza-1742-localapplicationrunner-custom-metrics Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a706adda Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a706adda Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a706adda Branch: refs/heads/master Commit: a706adda8767a057a81b316bced5ef3422def635 Parents: f249e71 Author: Daniel Nishimura <[email protected]> Authored: Fri Jun 8 14:02:53 2018 -0700 Committer: Jagadish <[email protected]> Committed: Fri Jun 8 14:02:53 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/runtime/LocalApplicationRunner.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a706adda/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index d64e57a..d3df741 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import java.time.Duration; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -42,6 +43,7 @@ import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.DistributedLockWithState; import org.apache.samza.execution.ExecutionPlan; import org.apache.samza.job.ApplicationStatus; +import org.apache.samza.metrics.MetricsReporter; import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.processor.StreamProcessor; import org.apache.samza.processor.StreamProcessorLifecycleListener; @@ -65,6 +67,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { private final CountDownLatch shutdownLatch = new CountDownLatch(1); private final AtomicInteger numProcessorsToStart = new AtomicInteger(); private final AtomicReference<Throwable> failure = new AtomicReference<>(); + private final Map<String, MetricsReporter> customMetricsReporters; private ApplicationStatus appStatus = ApplicationStatus.New; @@ -124,8 +127,13 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { } public LocalApplicationRunner(Config config) { + this(config, new HashMap<>()); + } + + public LocalApplicationRunner(Config config, Map<String, MetricsReporter> customMetricsReporters) { super(config); - uid = UUID.randomUUID().toString(); + this.uid = UUID.randomUUID().toString(); + this.customMetricsReporters = customMetricsReporters; } @Override @@ -313,10 +321,10 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { private StreamProcessor getStreamProcessorInstance(Config config, Object taskFactory, StreamProcessorLifecycleListener listener) { if (taskFactory instanceof StreamTaskFactory) { return new StreamProcessor( - config, new HashMap<>(), (StreamTaskFactory) taskFactory, listener); + config, customMetricsReporters, (StreamTaskFactory) taskFactory, listener); } else if (taskFactory instanceof AsyncStreamTaskFactory) { return new StreamProcessor( - config, new HashMap<>(), (AsyncStreamTaskFactory) taskFactory, listener); + config, customMetricsReporters, (AsyncStreamTaskFactory) taskFactory, listener); } else { throw new SamzaException(String.format("%s is not a valid task factory", taskFactory.getClass().getCanonicalName()));
