rmatharu commented on a change in pull request #1021: Adding writing of container.metadata file, and moving exception writing logic to DiagnosticsManager URL: https://github.com/apache/samza/pull/1021#discussion_r282292988
########## File path: samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.diagnostics; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.lang.reflect.InvocationTargetException; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.samza.metrics.reporter.Metrics; +import org.apache.samza.metrics.reporter.MetricsHeader; +import org.apache.samza.metrics.reporter.MetricsSnapshot; +import org.apache.samza.runtime.LocalContainerRunner; +import org.apache.samza.serializers.MetricsSnapshotSerdeV2; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemProducer; +import org.apache.samza.system.SystemStream; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; +import scala.collection.JavaConverters; + + +/** + * Responsible for publishing data to the diagnostic stream. + * Currently emits exception/error events obtained using a customer-appender that attaches to the root-logger. + */ +public class DiagnosticsManager { + + private static final Logger LOG = LoggerFactory.getLogger(DiagnosticsManager.class); + private static final Duration DEFAULT_PUBLISH_PERIOD = Duration.ofSeconds(60); + // Period size for pushing data to the diagnostic stream + + private static final String PUBLISH_THREAD_NAME = "DiagnosticsManager Thread-%d"; + private static final String METRICS_GROUP_NAME = "org.apache.samza.container.SamzaContainerMetrics"; + // Using SamzaContainerMetrics as the group name to maintain compatibility with existing diagnostics + + // Parameters used for populating the MetricHeader when sending diagnostic-stream messages + private final String jobName; + private final String jobId; + private final String containerName; + private final String executionEnvContainerId; + private final String taskClassVersion; + private final String samzaVersion; + private final String hostname; + private final Instant resetTime; + + private SystemProducer systemProducer; // SystemProducer for writing diagnostics data + private BoundedList<DiagnosticsExceptionEvent> exceptions; // A BoundedList for storing DiagnosticExceptionEvent + private final ScheduledExecutorService scheduler; // Scheduler for pushing data to the diagnostic stream + private final SystemStream diagnosticSystemStream; + + public DiagnosticsManager(String jobName, String jobId, String containerName, String executionEnvContainerId, + String taskClassVersion, String samzaVersion, String hostname, SystemStream diagnosticSystemStream, + SystemProducer systemProducer) { + this.jobName = jobName; + this.jobId = jobId; + this.containerName = containerName; + this.executionEnvContainerId = executionEnvContainerId; + this.taskClassVersion = taskClassVersion; + this.samzaVersion = samzaVersion; + this.hostname = hostname; + resetTime = Instant.now(); Review comment: Yes in MetricsHeader reset-time and time are different things. Reset-time = container startup timestamp. time = timestamp at time of emitting metric. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
