Repository: samza Updated Branches: refs/heads/master a8ddede18 -> ce5765796
SAMZA-1733: Create diagnostic topic if diagnostics enabled * Current SnapshotReporter semantics are to specify stream as <SYS-NAME>.<STREAM-NAME> * We create topic in JobRunner so that AM can also emit metrics (if desired). Author: rmath...@linkedin.com <rmath...@linkedin.com> Reviewers: Cameron Lee <ca...@linkedin.com> Closes #602 from rmatharu/topiccreate Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ce576579 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ce576579 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ce576579 Branch: refs/heads/master Commit: ce5765796c233341fbe4479db1d475964b9b2b2b Parents: a8ddede Author: rmath...@linkedin.com <rmath...@linkedin.com> Authored: Wed Aug 15 10:55:38 2018 -0700 Committer: Prateek Maheshwari <pmaheshw...@apache.org> Committed: Wed Aug 15 10:55:38 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/config/MetricsConfig.scala | 1 + .../scala/org/apache/samza/job/JobRunner.scala | 43 ++++++++++++++++---- 2 files changed, 35 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ce576579/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala index 258228c..dab9527 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala @@ -31,6 +31,7 @@ object MetricsConfig { val METRICS_SNAPSHOT_REPORTER_INTERVAL= "metrics.reporter.%s.interval" val METRICS_TIMER_ENABLED= "metrics.timer.enabled" val METRICS_SNAPSHOT_REPORTER_BLACKLIST = "metrics.reporter.%s.blacklist" + val METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS = "diagnosticsreporter" implicit def Config2Metrics(config: Config) = new MetricsConfig(config) } http://git-wip-us.apache.org/repos/asf/samza/blob/ce576579/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index c6e14f2..399aa14 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -23,16 +23,15 @@ package org.apache.samza.job import java.util.concurrent.TimeUnit import org.apache.samza.SamzaException -import org.apache.samza.config.Config +import org.apache.samza.config._ import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer} import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig} -import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish} import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.runtime.ApplicationRunnerMain.ApplicationRunnerCommandLine import org.apache.samza.runtime.ApplicationRunnerOperation -import org.apache.samza.system.StreamSpec -import org.apache.samza.util.{CoordinatorStreamUtil, Logging, Util} +import org.apache.samza.system.{StreamSpec, SystemAdmins} +import org.apache.samza.util.{CoordinatorStreamUtil, Logging, StreamUtil, Util} import scala.collection.JavaConverters._ @@ -82,21 +81,21 @@ class JobRunner(config: Config) extends Logging { val jobFactory: StreamJobFactory = getJobFactory val coordinatorSystemConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap) val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap) + val systemAdmins = new SystemAdmins(config) // Create the coordinator stream if it doesn't exist info("Creating coordinator stream") val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config) - val systemFactory = CoordinatorStreamUtil.getCoordinatorSystemFactory(config) - val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config) + val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem) val streamName = coordinatorSystemStream.getStream val coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem) - systemAdmin.start() - if (systemAdmin.createStream(coordinatorSpec)) { + coordinatorSystemAdmin.start() + if (coordinatorSystemAdmin.createStream(coordinatorSpec)) { info("Created coordinator stream %s." format streamName) } else { info("Coordinator stream %s already exists." format streamName) } - systemAdmin.stop() + coordinatorSystemAdmin.stop() if (resetJobConfig) { info("Storing config in coordinator stream.") @@ -118,6 +117,32 @@ class JobRunner(config: Config) extends Logging { } coordinatorSystemProducer.stop() + + // if diagnostics is enabled, create diagnostics stream if it doesnt exist + if (new JobConfig(config).getDiagnosticsEnabled) { + val DIAGNOSTICS_STREAM_ID = "samza-diagnostics-stream-id" + val diagnosticsSystemStreamName = new MetricsConfig(config). + getMetricsReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS). + getOrElse(throw new ConfigException("Missing required config: " + + String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM, + MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS))) + + val diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsSystemStreamName) + val diagnosticsSysAdmin = systemAdmins.getSystemAdmin(diagnosticsSystemStream.getSystem) + val diagnosticsStreamSpec = new StreamSpec(DIAGNOSTICS_STREAM_ID, diagnosticsSystemStream.getStream, + diagnosticsSystemStream.getSystem, new StreamConfig(config).getStreamProperties(DIAGNOSTICS_STREAM_ID)) + + info("Creating diagnostics stream %s" format diagnosticsSystemStream.getStream) + diagnosticsSysAdmin.start() + if (diagnosticsSysAdmin.createStream(diagnosticsStreamSpec)) { + info("Created diagnostics stream %s" format diagnosticsSystemStream.getStream) + } else { + info("Diagnostics stream %s already exists" format diagnosticsSystemStream.getStream) + } + diagnosticsSysAdmin.stop() + } + + // Create the actual job, and submit it. val job = jobFactory.getJob(config)