Repository: samza
Updated Branches:
  refs/heads/master a8ddede18 -> ce5765796


SAMZA-1733: Create diagnostic topic if diagnostics enabled

* Current SnapshotReporter semantics are to specify stream as 
<SYS-NAME>.<STREAM-NAME>

* We create topic in JobRunner so that AM can also emit metrics (if desired).

Author: rmath...@linkedin.com <rmath...@linkedin.com>

Reviewers: Cameron Lee <ca...@linkedin.com>

Closes #602 from rmatharu/topiccreate


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ce576579
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ce576579
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ce576579

Branch: refs/heads/master
Commit: ce5765796c233341fbe4479db1d475964b9b2b2b
Parents: a8ddede
Author: rmath...@linkedin.com <rmath...@linkedin.com>
Authored: Wed Aug 15 10:55:38 2018 -0700
Committer: Prateek Maheshwari <pmaheshw...@apache.org>
Committed: Wed Aug 15 10:55:38 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/config/MetricsConfig.scala |  1 +
 .../scala/org/apache/samza/job/JobRunner.scala  | 43 ++++++++++++++++----
 2 files changed, 35 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ce576579/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
index 258228c..dab9527 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
@@ -31,6 +31,7 @@ object MetricsConfig {
   val METRICS_SNAPSHOT_REPORTER_INTERVAL= "metrics.reporter.%s.interval"
   val METRICS_TIMER_ENABLED= "metrics.timer.enabled"
   val METRICS_SNAPSHOT_REPORTER_BLACKLIST = "metrics.reporter.%s.blacklist"
+  val METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS = "diagnosticsreporter"
 
   implicit def Config2Metrics(config: Config) = new MetricsConfig(config)
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ce576579/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index c6e14f2..399aa14 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -23,16 +23,15 @@ package org.apache.samza.job
 import java.util.concurrent.TimeUnit
 
 import org.apache.samza.SamzaException
-import org.apache.samza.config.Config
+import org.apache.samza.config._
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, 
CoordinatorStreamSystemProducer}
 import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
-import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish}
 import org.apache.samza.metrics.MetricsRegistryMap
 import 
org.apache.samza.runtime.ApplicationRunnerMain.ApplicationRunnerCommandLine
 import org.apache.samza.runtime.ApplicationRunnerOperation
-import org.apache.samza.system.StreamSpec
-import org.apache.samza.util.{CoordinatorStreamUtil, Logging, Util}
+import org.apache.samza.system.{StreamSpec, SystemAdmins}
+import org.apache.samza.util.{CoordinatorStreamUtil, Logging, StreamUtil, Util}
 
 import scala.collection.JavaConverters._
 
@@ -82,21 +81,21 @@ class JobRunner(config: Config) extends Logging {
     val jobFactory: StreamJobFactory = getJobFactory
     val coordinatorSystemConsumer = new 
CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
     val coordinatorSystemProducer = new 
CoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+    val systemAdmins = new SystemAdmins(config)
 
     // Create the coordinator stream if it doesn't exist
     info("Creating coordinator stream")
     val coordinatorSystemStream = 
CoordinatorStreamUtil.getCoordinatorSystemStream(config)
-    val systemFactory = 
CoordinatorStreamUtil.getCoordinatorSystemFactory(config)
-    val systemAdmin = 
systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
+    val coordinatorSystemAdmin = 
systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
     val streamName = coordinatorSystemStream.getStream
     val coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, 
coordinatorSystemStream.getSystem)
-    systemAdmin.start()
-    if (systemAdmin.createStream(coordinatorSpec)) {
+    coordinatorSystemAdmin.start()
+    if (coordinatorSystemAdmin.createStream(coordinatorSpec)) {
       info("Created coordinator stream %s." format streamName)
     } else {
       info("Coordinator stream %s already exists." format streamName)
     }
-    systemAdmin.stop()
+    coordinatorSystemAdmin.stop()
 
     if (resetJobConfig) {
       info("Storing config in coordinator stream.")
@@ -118,6 +117,32 @@ class JobRunner(config: Config) extends Logging {
     }
     coordinatorSystemProducer.stop()
 
+
+    // if diagnostics is enabled, create diagnostics stream if it doesnt exist
+    if (new JobConfig(config).getDiagnosticsEnabled) {
+      val DIAGNOSTICS_STREAM_ID = "samza-diagnostics-stream-id"
+      val diagnosticsSystemStreamName = new MetricsConfig(config).
+        
getMetricsReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS).
+        getOrElse(throw new ConfigException("Missing required config: " +
+          String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM,
+            MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)))
+
+      val diagnosticsSystemStream = 
StreamUtil.getSystemStreamFromNames(diagnosticsSystemStreamName)
+      val diagnosticsSysAdmin = 
systemAdmins.getSystemAdmin(diagnosticsSystemStream.getSystem)
+      val diagnosticsStreamSpec = new StreamSpec(DIAGNOSTICS_STREAM_ID, 
diagnosticsSystemStream.getStream,
+        diagnosticsSystemStream.getSystem, new 
StreamConfig(config).getStreamProperties(DIAGNOSTICS_STREAM_ID))
+
+      info("Creating diagnostics stream %s" format 
diagnosticsSystemStream.getStream)
+      diagnosticsSysAdmin.start()
+      if (diagnosticsSysAdmin.createStream(diagnosticsStreamSpec)) {
+        info("Created diagnostics stream %s" format 
diagnosticsSystemStream.getStream)
+      } else {
+        info("Diagnostics stream %s already exists" format 
diagnosticsSystemStream.getStream)
+      }
+      diagnosticsSysAdmin.stop()
+    }
+
+
     // Create the actual job, and submit it.
     val job = jobFactory.getJob(config)
 

Reply via email to