This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 0faf072 SAMZA-2707: Convert metrics snapshot reporter code to java
(#1558)
0faf072 is described below
commit 0faf072792056bdd37ab70e1afd386309b571bd3
Author: Cameron Lee <[email protected]>
AuthorDate: Tue Nov 30 12:34:09 2021 -0800
SAMZA-2707: Convert metrics snapshot reporter code to java (#1558)
API changes:
1. (backwards compatible) Added RegistryWithSource Java class to help with
implementing MetricsReporters in Java
2. (applies to usage of org.apache.samza.metrics.reporter.Metrics* classes
from samza-core) Converted classes no longer provide direct access to
constructor parameters (e.g. Metrics.immutableMetrics() is no longer
accessible). "Container" objects like Metrics, MetricsSnapshot, and
MetricsHeader still have getters for their fields.
3. (applies to usage of org.apache.samza.metrics.reporter.Metrics* classes
from samza-core) "Container" objects like Metrics, MetricsSnapshot, and
MetricsHeader no longer treat their fields as @BeanProperty, so they no longer
have setters. It's better that container objects are immutable anyways, and
there weren't any usages of the setters within Samza code.
---
.../samza/metrics/MetricsRegistryWithSource.java | 36 ++--
.../org/apache/samza/metrics/reporter/Metrics.java | 77 +++++++
.../samza/metrics/reporter/MetricsHeader.java | 160 ++++++++++++++
.../samza/metrics/reporter/MetricsSnapshot.java | 80 +++++++
.../metrics/reporter/MetricsSnapshotReporter.java | 230 +++++++++++++++++++++
.../reporter/MetricsSnapshotReporterFactory.java | 142 +++++++++++++
.../apache/samza/metrics/reporter/Metrics.scala | 58 ------
.../samza/metrics/reporter/MetricsHeader.scala | 71 -------
.../metrics/reporter/MetricsSnapshotReporter.scala | 192 -----------------
.../reporter/MetricsSnapshotReporterFactory.scala | 133 ------------
.../apache/samza/metrics/reporter/TestMetrics.java | 49 +++++
.../samza/metrics/reporter/TestMetricsHeader.java | 77 +++++++
.../metrics/reporter/TestMetricsSnapshot.java | 49 +++++
.../TestMetricsSnapshotReporter.java | 40 ++--
.../TestMetricsSnapshotReporterFactory.java | 117 +++++++++++
.../serializers/TestMetricsSnapshotSerde.java | 81 ++++++++
.../serializers/TestMetricsSnapshotSerdeV2.java | 80 ++++---
.../serializers/TestMetricsSnapshotSerde.scala | 48 -----
18 files changed, 1148 insertions(+), 572 deletions(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshot.scala
b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistryWithSource.java
similarity index 54%
rename from
samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshot.scala
rename to
samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistryWithSource.java
index ad08eb7..3ad47d5 100644
---
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshot.scala
+++
b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistryWithSource.java
@@ -16,28 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.samza.metrics;
-package org.apache.samza.metrics.reporter
-
-import java.util.Map
-import java.util.HashMap
-import scala.beans.BeanProperty
+/**
+ * Simple holder for {@link ReadableMetricsRegistry} and its source, used for
implementations of
+ * {@link MetricsReporter}.
+ */
+public class MetricsRegistryWithSource {
+ private final String source;
+ private final ReadableMetricsRegistry registry;
-object MetricsSnapshot {
- def fromMap(map: Map[String, Map[String, Object]]) = {
- val header = MetricsHeader.fromMap(map.get("header"))
- val metrics = Metrics.fromMap(map.get("metrics").asInstanceOf[Map[String,
Map[String, Object]]])
- new MetricsSnapshot(header, metrics)
+ public MetricsRegistryWithSource(String source, ReadableMetricsRegistry
registry) {
+ this.source = source;
+ this.registry = registry;
}
-}
-class MetricsSnapshot(@BeanProperty val header: MetricsHeader, @BeanProperty
val metrics: Metrics) {
- def getAsMap(): Map[String, Object] = {
- val map = new HashMap[String, Object]
-
- map.put("header", header.getAsMap)
- map.put("metrics", metrics.getAsMap)
+ public String getSource() {
+ return source;
+ }
- map
+ public ReadableMetricsRegistry getRegistry() {
+ return registry;
}
-}
+}
\ No newline at end of file
diff --git
a/samza-core/src/main/java/org/apache/samza/metrics/reporter/Metrics.java
b/samza-core/src/main/java/org/apache/samza/metrics/reporter/Metrics.java
new file mode 100644
index 0000000..8c321c5
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/Metrics.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metrics.reporter;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+
+public class Metrics {
+ private final Map<String, Map<String, Object>> immutableMetrics;
+
+ public Metrics() {
+ this(Collections.emptyMap());
+ }
+
+ public Metrics(Map<String, Map<String, Object>> metrics) {
+ Map<String, Map<String, Object>> metricsMapCopy = new HashMap<>();
+ metrics.forEach((key, value) -> metricsMapCopy.put(key,
Collections.unmodifiableMap(new HashMap<>(value))));
+ this.immutableMetrics = metricsMapCopy;
+ }
+
+ public <T> T get(String group, String metricName) {
+ return (T) this.immutableMetrics.get(group).get(metricName);
+ }
+
+ public Map<String, Object> get(String group) {
+ return this.immutableMetrics.get(group);
+ }
+
+ public Map<String, Map<String, Object>> getAsMap() {
+ return Collections.unmodifiableMap(this.immutableMetrics);
+ }
+
+ public static Metrics fromMap(Map<String, Map<String, Object>> map) {
+ return new Metrics(map);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Metrics that = (Metrics) o;
+ return Objects.equals(immutableMetrics, that.immutableMetrics);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(immutableMetrics);
+ }
+
+ @Override
+ public String toString() {
+ return "MetricsJava{" + "immutableMetrics=" + immutableMetrics + '}';
+ }
+}
\ No newline at end of file
diff --git
a/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsHeader.java
b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsHeader.java
new file mode 100644
index 0000000..5215038
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsHeader.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metrics.reporter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+
+public class MetricsHeader {
+ private static final String JOB_NAME = "job-name";
+ private static final String JOB_ID = "job-id";
+ private static final String CONTAINER_NAME = "container-name";
+ private static final String EXEC_ENV_CONTAINER_ID = "exec-env-container-id";
+ private static final String SOURCE = "source";
+ private static final String VERSION = "version";
+ private static final String SAMZA_VERSION = "samza-version";
+ private static final String HOST = "host";
+ private static final String TIME = "time";
+ private static final String RESET_TIME = "reset-time";
+
+ private final String jobName;
+ private final String jobId;
+ private final String containerName;
+ private final String execEnvironmentContainerId;
+ private final String source;
+ private final String version;
+ private final String samzaVersion;
+ private final String host;
+ private final long time;
+ private final long resetTime;
+
+ public MetricsHeader(String jobName, String jobId, String containerName,
String execEnvironmentContainerId,
+ String source, String version, String samzaVersion, String host, long
time, long resetTime) {
+ this.jobName = jobName;
+ this.jobId = jobId;
+ this.containerName = containerName;
+ this.execEnvironmentContainerId = execEnvironmentContainerId;
+ this.source = source;
+ this.version = version;
+ this.samzaVersion = samzaVersion;
+ this.host = host;
+ this.time = time;
+ this.resetTime = resetTime;
+ }
+
+ public Map<String, Object> getAsMap() {
+ Map<String, Object> map = new HashMap<>();
+ map.put(JOB_NAME, jobName);
+ map.put(JOB_ID, jobId);
+ map.put(CONTAINER_NAME, containerName);
+ map.put(EXEC_ENV_CONTAINER_ID, execEnvironmentContainerId);
+ map.put(SOURCE, source);
+ map.put(VERSION, version);
+ map.put(SAMZA_VERSION, samzaVersion);
+ map.put(HOST, host);
+ map.put(TIME, time);
+ map.put(RESET_TIME, resetTime);
+ return map;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getContainerName() {
+ return containerName;
+ }
+
+ public String getExecEnvironmentContainerId() {
+ return execEnvironmentContainerId;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public String getSamzaVersion() {
+ return samzaVersion;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public long getTime() {
+ return time;
+ }
+
+ public long getResetTime() {
+ return resetTime;
+ }
+
+ public static MetricsHeader fromMap(Map<String, Object> map) {
+ return new MetricsHeader(map.get(JOB_NAME).toString(),
+ map.get(JOB_ID).toString(),
+ map.get(CONTAINER_NAME).toString(),
+ map.get(EXEC_ENV_CONTAINER_ID).toString(),
+ map.get(SOURCE).toString(),
+ map.get(VERSION).toString(),
+ map.get(SAMZA_VERSION).toString(),
+ map.get(HOST).toString(),
+ ((Number) map.get(TIME)).longValue(),
+ ((Number) map.get(RESET_TIME)).longValue());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MetricsHeader that = (MetricsHeader) o;
+ return time == that.time && resetTime == that.resetTime &&
Objects.equals(jobName, that.jobName) && Objects.equals(
+ jobId, that.jobId) && Objects.equals(containerName,
that.containerName) && Objects.equals(
+ execEnvironmentContainerId, that.execEnvironmentContainerId) &&
Objects.equals(source, that.source)
+ && Objects.equals(version, that.version) &&
Objects.equals(samzaVersion, that.samzaVersion) && Objects.equals(
+ host, that.host);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobName, jobId, containerName,
execEnvironmentContainerId, source, version, samzaVersion, host,
+ time, resetTime);
+ }
+
+ @Override
+ public String toString() {
+ return "MetricsHeader{" + "jobName='" + jobName + '\'' + ", jobId='" +
jobId + '\'' + ", containerName='"
+ + containerName + '\'' + ", execEnvironmentContainerId='" +
execEnvironmentContainerId + '\'' + ", source='"
+ + source + '\'' + ", version='" + version + '\'' + ", samzaVersion='"
+ samzaVersion + '\'' + ", host='" + host
+ + '\'' + ", time=" + time + ", resetTime=" + resetTime + '}';
+ }
+}
\ No newline at end of file
diff --git
a/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshot.java
b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshot.java
new file mode 100644
index 0000000..00b9deb
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshot.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metrics.reporter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+
+public class MetricsSnapshot {
+ private static final String HEADER_KEY = "header";
+ private static final String METRICS_KEY = "metrics";
+
+ private final MetricsHeader metricsHeader;
+ private final Metrics metrics;
+
+ public MetricsSnapshot(MetricsHeader metricsHeader, Metrics metrics) {
+ this.metricsHeader = metricsHeader;
+ this.metrics = metrics;
+ }
+
+ public MetricsHeader getHeader() {
+ return metricsHeader;
+ }
+
+ public Metrics getMetrics() {
+ return metrics;
+ }
+
+ public Map<String, Object> getAsMap() {
+ Map<String, Object> map = new HashMap<>();
+ map.put(HEADER_KEY, this.metricsHeader.getAsMap());
+ map.put(METRICS_KEY, this.metrics.getAsMap());
+ return map;
+ }
+
+ public static MetricsSnapshot fromMap(Map<String, Map<String, Object>> map) {
+ MetricsHeader metricsHeader = MetricsHeader.fromMap(map.get(HEADER_KEY));
+ Metrics metrics = Metrics.fromMap((Map<String, Map<String, Object>>)
(Map<?, ?>) map.get(METRICS_KEY));
+ return new MetricsSnapshot(metricsHeader, metrics);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MetricsSnapshot that = (MetricsSnapshot) o;
+ return Objects.equals(metricsHeader, that.metricsHeader) &&
Objects.equals(metrics, that.metrics);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(metricsHeader, metrics);
+ }
+
+ @Override
+ public String toString() {
+ return "MetricsSnapshotJava{" + "metricsHeader=" + metricsHeader + ",
metrics=" + metrics + '}';
+ }
+}
\ No newline at end of file
diff --git
a/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.java
b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.java
new file mode 100644
index 0000000..648280a
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metrics.reporter;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistryWithSource;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.MetricsVisitor;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.serializers.Serializer;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * MetricsSnapshotReporter is a generic metrics reporter that sends metrics to
a stream.
+ *
+ * jobName // my-samza-job
+ * jobId // an id that differentiates multiple executions of the same job
+ * taskName // container_567890
+ * host // eat1-app128.gird
+ * version // 0.0.1
+ * blacklist // Regex of metrics to ignore when flushing
+ */
+public class MetricsSnapshotReporter implements MetricsReporter, Runnable {
+ private static final Logger LOG =
LoggerFactory.getLogger(MetricsSnapshotReporter.class);
+
+ private final SystemProducer producer;
+ private final SystemStream out;
+ private final Duration reportingInterval;
+ private final String jobName;
+ private final String jobId;
+ private final String containerName;
+ private final String version;
+ private final String samzaVersion;
+ private final String host;
+ private final Serializer<MetricsSnapshot> serializer;
+ private final Optional<Pattern> blacklist;
+ private final Clock clock;
+
+ private final String execEnvironmentContainerId;
+ private final ScheduledExecutorService executor;
+ private final long resetTime;
+ private final List<MetricsRegistryWithSource> registries = new ArrayList<>();
+ private final Set<String> blacklistedMetrics = new HashSet<>();
+
+ public MetricsSnapshotReporter(SystemProducer producer, SystemStream out,
Duration reportingInterval, String jobName,
+ String jobId, String containerName, String version, String samzaVersion,
String host,
+ Serializer<MetricsSnapshot> serializer, Optional<Pattern> blacklist,
Clock clock) {
+ this.producer = producer;
+ this.out = out;
+ this.reportingInterval = reportingInterval;
+ this.jobName = jobName;
+ this.jobId = jobId;
+ this.containerName = containerName;
+ this.version = version;
+ this.samzaVersion = samzaVersion;
+ this.host = host;
+ this.serializer = serializer;
+ this.blacklist = blacklist;
+ this.clock = clock;
+
+ this.execEnvironmentContainerId =
+
Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID)).orElse("");
+ this.executor = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("Samza
MetricsSnapshotReporter Thread-%d").setDaemon(true).build());
+ this.resetTime = this.clock.currentTimeMillis();
+ LOG.info(
+ "got metrics snapshot reporter properties [job name: {}, job id: {},
containerName: {}, version: {}, samzaVersion: {}, host: {}, reportingInterval
{}]",
+ jobName, jobId, containerName, version, samzaVersion, host,
reportingInterval);
+ }
+
+ @Override
+ public void start() {
+ LOG.info("Starting producer.");
+ this.producer.start();
+ LOG.info("Starting reporter timer.");
+ this.executor.scheduleWithFixedDelay(this, 0,
reportingInterval.getSeconds(), TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void register(String source, ReadableMetricsRegistry registry) {
+ this.registries.add(new MetricsRegistryWithSource(source, registry));
+ LOG.info("Registering {} with producer.", source);
+ this.producer.register(source);
+ }
+
+ @Override
+ public void stop() {
+ // Scheduling an event with 0 delay to ensure flushing of metrics one last
time before shutdown
+ this.executor.schedule(this, 0, TimeUnit.SECONDS);
+ LOG.info("Stopping reporter timer.");
+ // Allow the scheduled task above to finish, and block for termination
(for max 60 seconds)
+ this.executor.shutdown();
+ try {
+ this.executor.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new SamzaException(e);
+ }
+ LOG.info("Stopping producer.");
+ this.producer.stop();
+ if (!this.executor.isTerminated()) {
+ LOG.warn("Unable to shutdown reporter timer.");
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ innerRun();
+ } catch (Exception e) {
+ // Ignore all exceptions - because subsequent executions of this
scheduled task will be suppressed
+ // by the executor if the current task throws an unhandled exception.
+ LOG.warn("Error while reporting metrics. Will retry in " +
reportingInterval + " seconds.", e);
+ }
+ }
+
+ public void innerRun() {
+ LOG.debug("Begin flushing metrics.");
+ for (MetricsRegistryWithSource metricsRegistryWithSource :
this.registries) {
+ String source = metricsRegistryWithSource.getSource();
+ ReadableMetricsRegistry registry =
metricsRegistryWithSource.getRegistry();
+ LOG.debug("Flushing metrics for {}.", source);
+ Map<String, Map<String, Object>> metricsMsg = new HashMap<>();
+
+ // metrics
+ registry.getGroups().forEach(group -> {
+ Map<String, Object> groupMsg = new HashMap<>();
+ registry.getGroup(group).forEach((name, metric) -> {
+ if (!shouldIgnore(group, name)) {
+ metric.visit(new MetricsVisitor() {
+ @Override
+ public void counter(Counter counter) {
+ groupMsg.put(name, counter.getCount());
+ }
+
+ @Override
+ public <T> void gauge(Gauge<T> gauge) {
+ groupMsg.put(name, gauge.getValue());
+ }
+
+ @Override
+ public void timer(Timer timer) {
+ groupMsg.put(name, timer.getSnapshot().getAverage());
+ }
+ });
+ }
+ });
+
+ // dont emit empty groups
+ if (!groupMsg.isEmpty()) {
+ metricsMsg.put(group, groupMsg);
+ }
+ });
+
+ // publish to Kafka only if the metricsMsg carries any metrics
+ if (!metricsMsg.isEmpty()) {
+ MetricsHeader header =
+ new MetricsHeader(this.jobName, this.jobId, this.containerName,
this.execEnvironmentContainerId, source,
+ this.version, this.samzaVersion, this.host,
this.clock.currentTimeMillis(), this.resetTime);
+ Metrics metrics = new Metrics(metricsMsg);
+ LOG.debug("Flushing metrics for {} to {} with header and map:
header={}, map={}.", source, out,
+ header.getAsMap(), metrics.getAsMap());
+ MetricsSnapshot metricsSnapshot = new MetricsSnapshot(header, metrics);
+ Object maybeSerialized = (this.serializer != null) ?
this.serializer.toBytes(metricsSnapshot) : metricsSnapshot;
+ try {
+ this.producer.send(source, new OutgoingMessageEnvelope(this.out,
this.host, null, maybeSerialized));
+ // Always flush, since we don't want metrics to get batched up.
+ this.producer.flush(source);
+ } catch (Exception e) {
+ LOG.error(String.format("Exception when flushing metrics for source
%s", source), e);
+ }
+ }
+ }
+ LOG.debug("Finished flushing metrics.");
+ }
+
+ protected boolean shouldIgnore(String group, String metricName) {
+ boolean isBlacklisted = this.blacklist.isPresent();
+ String fullMetricName = group + "." + metricName;
+
+ if (isBlacklisted && !this.blacklistedMetrics.contains(fullMetricName)) {
+ if (this.blacklist.get().matcher(fullMetricName).matches()) {
+ this.blacklistedMetrics.add(fullMetricName);
+ LOG.debug("Samza diagnostics: blacklisted metric {} because it matched
blacklist regex: {}", fullMetricName,
+ this.blacklist.get());
+ } else {
+ isBlacklisted = false;
+ }
+ }
+ return isBlacklisted;
+ }
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.java
b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.java
new file mode 100644
index 0000000..1c89c78
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metrics.reporter;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.SystemConfig;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.MetricsReporterFactory;
+import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.StreamUtil;
+import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MetricsSnapshotReporterFactory implements MetricsReporterFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(MetricsSnapshotReporterFactory.class);
+
+ @Override
+ public MetricsReporter getMetricsReporter(String reporterName, String
containerName, Config config) {
+ LOG.info("Creating new metrics snapshot reporter.");
+ MetricsRegistryMap registry = new MetricsRegistryMap();
+
+ SystemStream systemStream = getSystemStream(reporterName, config);
+ SystemProducer producer = getProducer(reporterName, config, registry);
+ Duration reportingInterval =
Duration.ofSeconds(getReportingInterval(reporterName, config));
+ String jobName = getJobName(config);
+ String jobId = getJobId(config);
+ Serde<MetricsSnapshot> serde = getSerde(reporterName, config);
+ Optional<Pattern> blacklist = getBlacklist(reporterName, config);
+
+ MetricsSnapshotReporter reporter =
+ new MetricsSnapshotReporter(producer, systemStream, reportingInterval,
jobName, jobId, containerName,
+ Util.getTaskClassVersion(config), Util.getSamzaVersion(),
Util.getLocalHost().getHostName(), serde,
+ blacklist, SystemClock.instance());
+ reporter.register(this.getClass().getSimpleName(), registry);
+ return reporter;
+ }
+
+ protected SystemProducer getProducer(String reporterName, Config config,
MetricsRegistryMap registry) {
+ SystemConfig systemConfig = new SystemConfig(config);
+ String systemName = getSystemStream(reporterName, config).getSystem();
+ String systemFactoryClassName = systemConfig.getSystemFactory(systemName)
+ .orElseThrow(() -> new SamzaException(
+ String.format("Trying to fetch system factory for system %s, which
isn't defined in config.", systemName)));
+ SystemFactory systemFactory =
ReflectionUtil.getObj(systemFactoryClassName, SystemFactory.class);
+ LOG.info("Got system factory {}.", systemFactory);
+ SystemProducer producer = systemFactory.getProducer(systemName, config,
registry);
+ LOG.info("Got producer {}.", producer);
+ return producer;
+ }
+
+ protected SystemStream getSystemStream(String reporterName, Config config) {
+ MetricsConfig metricsConfig = new MetricsConfig(config);
+ String metricsSystemStreamName =
metricsConfig.getMetricsSnapshotReporterStream(reporterName)
+ .orElseThrow(() -> new SamzaException("No metrics stream defined in
config."));
+ SystemStream systemStream =
StreamUtil.getSystemStreamFromNames(metricsSystemStreamName);
+ LOG.info("Got system stream {}.", systemStream);
+ return systemStream;
+ }
+
+ protected Serde<MetricsSnapshot> getSerde(String reporterName, Config
config) {
+ StreamConfig streamConfig = new StreamConfig(config);
+ SystemConfig systemConfig = new SystemConfig(config);
+ SystemStream systemStream = getSystemStream(reporterName, config);
+
+ Optional<String> streamSerdeName =
streamConfig.getStreamMsgSerde(systemStream);
+ Optional<String> systemSerdeName =
systemConfig.getSystemMsgSerde(systemStream.getSystem());
+ String serdeName = streamSerdeName.orElse(systemSerdeName.orElse(null));
+ SerializerConfig serializerConfig = new SerializerConfig(config);
+ Serde<MetricsSnapshot> serde;
+ if (serdeName != null) {
+ Optional<String> serdeFactoryClass =
serializerConfig.getSerdeFactoryClass(serdeName);
+ if (serdeFactoryClass.isPresent()) {
+ SerdeFactory<MetricsSnapshot> serdeFactory =
ReflectionUtil.getObj(serdeFactoryClass.get(), SerdeFactory.class);
+ serde = serdeFactory.getSerde(serdeName, config);
+ } else {
+ serde = null;
+ }
+ } else {
+ serde = new MetricsSnapshotSerdeV2();
+ }
+ LOG.info("Got serde {}.", serde);
+ return serde;
+ }
+
+ protected Optional<Pattern> getBlacklist(String reporterName, Config config)
{
+ MetricsConfig metricsConfig = new MetricsConfig(config);
+ Optional<String> blacklist =
metricsConfig.getMetricsSnapshotReporterBlacklist(reporterName);
+ LOG.info("Got blacklist as: {}", blacklist);
+ return blacklist.map(Pattern::compile);
+ }
+
+ protected int getReportingInterval(String reporterName, Config config) {
+ MetricsConfig metricsConfig = new MetricsConfig(config);
+ int reportingInterval =
metricsConfig.getMetricsSnapshotReporterInterval(reporterName);
+ LOG.info("Got reporting interval: {}", reportingInterval);
+ return reportingInterval;
+ }
+
+ protected String getJobId(Config config) {
+ JobConfig jobConfig = new JobConfig(config);
+ return jobConfig.getJobId();
+ }
+
+ protected String getJobName(Config config) {
+ JobConfig jobConfig = new JobConfig(config);
+ return jobConfig.getName().orElseThrow(() -> new SamzaException("Job name
must be defined in config."));
+ }
+}
diff --git
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala
deleted file mode 100644
index 218157e..0000000
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.metrics.reporter
-
-import java.util.{Collections, HashMap, Map}
-import scala.collection.JavaConverters._
-
-object Metrics {
- def fromMap(map: Map[String, Map[String, Object]]): Metrics = {
- new Metrics(map)
- }
-}
-
-/**
- * Immutable metrics snapshot.
- */
-class Metrics(metrics: Map[String, Map[String, Object]]) {
- val immutableMetrics = new HashMap[String, Map[String, Object]]
-
- for ((groupKey, groupValue) <- metrics.asScala) {
- val immutableMetricGroup = new HashMap[String, Object]
-
- for ((metricKey, metricValue) <- groupValue.asScala) {
- immutableMetricGroup.put(metricKey, metricValue)
- }
-
- immutableMetrics.put(groupKey,
Collections.unmodifiableMap(immutableMetricGroup))
- }
-
- def get[T](group: String, metricName: String) =
- immutableMetrics.get(group).get(metricName).asInstanceOf[T]
-
- def get(group: String) = immutableMetrics.get(group)
-
- def getAsMap(): Map[String, Map[String, Object]] =
Collections.unmodifiableMap(immutableMetrics)
-
- // default constructor to enable deserialization by MetricsSnapshotSerdeV2
- def this() {
- this(new HashMap[String, Map[String, Object]]())
- }
-}
diff --git
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
deleted file mode 100644
index 2fef04e..0000000
---
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.metrics.reporter
-
-import java.util.HashMap
-import java.util.Map
-import scala.beans.BeanProperty
-
-object MetricsHeader {
- def fromMap(map: Map[String, Object]): MetricsHeader = {
- new MetricsHeader(
- map.get("job-name").toString,
- map.get("job-id").toString,
- map.get("container-name").toString,
- map.get("exec-env-container-id").toString,
- map.get("source").toString,
- map.get("version").toString,
- map.get("samza-version").toString,
- map.get("host").toString,
- map.get("time").asInstanceOf[Number].longValue,
- map.get("reset-time").asInstanceOf[Number].longValue)
- }
-}
-
-/**
- * Immutable metric header snapshot.
- */
-class MetricsHeader(
- @BeanProperty val jobName: String,
- @BeanProperty val jobId: String,
- @BeanProperty val containerName: String,
- @BeanProperty val execEnvironmentContainerId: String,
- @BeanProperty val source: String,
- @BeanProperty val version: String,
- @BeanProperty val samzaVersion: String,
- @BeanProperty val host: String,
- @BeanProperty val time: Long,
- @BeanProperty val resetTime: Long) {
-
- def getAsMap: Map[String, Object] = {
- val map = new HashMap[String, Object]
- map.put("job-name", jobName)
- map.put("job-id", jobId)
- map.put("container-name", containerName)
- map.put("exec-env-container-id", execEnvironmentContainerId)
- map.put("source", source)
- map.put("version", version)
- map.put("samza-version", samzaVersion)
- map.put("host", host)
- map.put("time", time: java.lang.Long)
- map.put("reset-time", resetTime: java.lang.Long)
- map
- }
-}
diff --git
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
deleted file mode 100644
index 7a0d79f..0000000
---
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.metrics.reporter
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder
-import org.apache.samza.metrics._
-import org.apache.samza.serializers.Serializer
-import org.apache.samza.system.OutgoingMessageEnvelope
-import org.apache.samza.system.SystemProducer
-import org.apache.samza.system.SystemStream
-import org.apache.samza.util.Logging
-import java.util.HashMap
-import java.util.Map
-import java.util.concurrent.Executors
-import java.util.concurrent.TimeUnit
-
-import org.apache.samza.config.ShellCommandConfig
-
-import scala.collection.JavaConverters._
-
-/**
- * MetricsSnapshotReporter is a generic metrics reporter that sends metrics to
a stream.
- *
- * jobName // my-samza-job
- * jobId // an id that differentiates multiple executions of the same job
- * taskName // container_567890
- * host // eat1-app128.gird
- * version // 0.0.1
- * blacklist // Regex of metrics to ignore when flushing
- */
-class MetricsSnapshotReporter(
- producer: SystemProducer,
- out: SystemStream,
- reportingInterval: Int,
- jobName: String,
- jobId: String,
- containerName: String,
- version: String,
- samzaVersion: String,
- host: String,
- serializer: Serializer[MetricsSnapshot] = null,
- blacklist: Option[String],
- clock: () => Long = () => { System.currentTimeMillis }) extends
MetricsReporter with Runnable with Logging {
-
- val execEnvironmentContainerId =
Option[String](System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID)).getOrElse("")
-
- val executor = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat("Samza MetricsSnapshotReporter
Thread-%d").setDaemon(true).build())
- val resetTime = clock()
- var registries = List[(String, ReadableMetricsRegistry)]()
- var blacklistedMetrics = Set[String]()
-
- info("got metrics snapshot reporter properties [job name: %s, job id: %s,
containerName: %s, version: %s, samzaVersion: %s, host: %s, reportingInterval
%s]"
- format(jobName, jobId, containerName, version, samzaVersion, host,
reportingInterval))
-
- def start {
- info("Starting producer.")
-
- producer.start
-
- info("Starting reporter timer.")
-
- executor.scheduleWithFixedDelay(this, 0, reportingInterval,
TimeUnit.SECONDS)
- }
-
- def register(source: String, registry: ReadableMetricsRegistry) {
- registries ::= (source, registry)
-
- info("Registering %s with producer." format source)
-
- producer.register(source)
- }
-
- def stop = {
-
- // Scheduling an event with 0 delay to ensure flushing of metrics one last
time before shutdown
- executor.schedule(this, 0, TimeUnit.SECONDS)
-
- info("Stopping reporter timer.")
- // Allow the scheduled task above to finish, and block for termination
(for max 60 seconds)
- executor.shutdown
- executor.awaitTermination(60, TimeUnit.SECONDS)
-
- info("Stopping producer.")
- producer.stop
-
- if (!executor.isTerminated) {
- warn("Unable to shutdown reporter timer.")
- }
- }
-
- def run() {
- try {
- innerRun()
- } catch {
- case e: Exception =>
- // Ignore all exceptions - because subsequent executions of this
scheduled task will be suppressed
- // by the executor if the current task throws an unhandled exception.
- warn("Error while reporting metrics. Will retry in " +
reportingInterval + " seconds.", e)
- }
-
- }
-
- def innerRun(): Unit = {
- debug("Begin flushing metrics.")
- for ((source, registry) <- registries) {
- debug("Flushing metrics for %s." format source)
-
- val metricsMsg = new HashMap[String, Map[String, Object]]
-
- // metrics
- registry.getGroups.asScala.foreach(group => {
- val groupMsg = new HashMap[String, Object]
-
- registry.getGroup(group).asScala.foreach {
- case (name, metric) =>
- if (!shouldIgnore(group, name)) {
- metric.visit(new MetricsVisitor {
- def counter(counter: Counter) = groupMsg.put(name,
counter.getCount: java.lang.Long)
- def gauge[T](gauge: Gauge[T]) = groupMsg.put(name,
gauge.getValue.asInstanceOf[Object])
- def timer(timer: Timer) = groupMsg.put(name,
timer.getSnapshot().getAverage(): java.lang.Double)
- })
- }
- }
-
- // dont emit empty groups
- if (!groupMsg.isEmpty) {
- metricsMsg.put(group, groupMsg)
- }
- })
-
- // publish to Kafka only if the metricsMsg carries any metrics
- if (!metricsMsg.isEmpty) {
- val header = new MetricsHeader(jobName, jobId, containerName,
execEnvironmentContainerId, source, version, samzaVersion, host, clock(),
resetTime)
- val metrics = new Metrics(metricsMsg)
-
- debug("Flushing metrics for %s to %s with header and map: header=%s,
map=%s." format(source, out, header.getAsMap, metrics.getAsMap()))
-
- val metricsSnapshot = new MetricsSnapshot(header, metrics)
- val maybeSerialized = if (serializer != null) {
- serializer.toBytes(metricsSnapshot)
- } else {
- metricsSnapshot
- }
-
- try {
-
- producer.send(source, new OutgoingMessageEnvelope(out, host, null,
maybeSerialized))
-
- // Always flush, since we don't want metrics to get batched up.
- producer.flush(source)
- } catch {
- case e: Exception => error("Exception when flushing metrics for
source %s " format (source), e)
- }
- }
- }
- debug("Finished flushing metrics.")
- }
-
- def shouldIgnore(group: String, metricName: String) = {
- var isBlacklisted = blacklist.isDefined
- val fullMetricName = group + "." + metricName
-
- if (isBlacklisted && !blacklistedMetrics.contains(fullMetricName)) {
- if (fullMetricName.matches(blacklist.get)) {
- blacklistedMetrics += fullMetricName
- debug("Blacklisted metric %s because it matched blacklist regex: %s"
format(fullMetricName, blacklist.get))
- } else {
- isBlacklisted = false
- }
- }
-
- isBlacklisted
- }
-}
diff --git
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
deleted file mode 100644
index 2f9a0ba..0000000
---
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.metrics.reporter
-
-import org.apache.samza.SamzaException
-import org.apache.samza.config._
-import org.apache.samza.metrics.{MetricsRegistryMap, MetricsReporter,
MetricsReporterFactory}
-import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, Serde,
SerdeFactory}
-import org.apache.samza.system.{SystemFactory, SystemProducer, SystemStream}
-import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
-import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
-
-class MetricsSnapshotReporterFactory extends MetricsReporterFactory with
Logging {
-
- protected def getProducer(reporterName: String, config: Config, registry:
MetricsRegistryMap): SystemProducer = {
- val systemConfig = new SystemConfig(config)
- val systemName = getSystemStream(reporterName, config).getSystem
- val systemFactoryClassName =
JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
- .getOrElse(throw new SamzaException("Trying to fetch system factory for
system %s, which isn't defined in config." format systemName))
- val systemFactory = ReflectionUtil.getObj(systemFactoryClassName,
classOf[SystemFactory])
-
- info("Got system factory %s." format systemFactory)
- val producer = systemFactory.getProducer(systemName, config, registry)
- info("Got producer %s." format producer)
-
- producer
- }
-
- protected def getSystemStream(reporterName: String, config: Config):
SystemStream = {
- val metricsConfig = new MetricsConfig(config)
- val metricsSystemStreamName =
JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterStream(reporterName))
- .toOption
- .getOrElse(throw new SamzaException("No metrics stream defined in
config."))
- val systemStream =
StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
- info("Got system stream %s." format systemStream)
- systemStream
- }
-
- protected def getSerde(reporterName: String, config: Config):
Serde[MetricsSnapshot] = {
- val streamConfig = new StreamConfig(config)
- val systemConfig = new SystemConfig(config)
- val systemStream = getSystemStream(reporterName, config)
-
- val streamSerdeName = streamConfig.getStreamMsgSerde(systemStream)
- val systemSerdeName =
systemConfig.getSystemMsgSerde(systemStream.getSystem)
- val serdeName = streamSerdeName.orElse(systemSerdeName.orElse(null))
- val serializerConfig = new SerializerConfig(config)
- val serde = if (serdeName != null) {
-
JavaOptionals.toRichOptional(serializerConfig.getSerdeFactoryClass(serdeName)).toOption
match {
- case Some(serdeClassName) =>
- ReflectionUtil.getObj(serdeClassName,
classOf[SerdeFactory[MetricsSnapshot]]).getSerde(serdeName, config)
- case _ => null
- }
- } else {
- new MetricsSnapshotSerdeV2
- }
- info("Got serde %s." format serde)
- serde
- }
-
-
- protected def getBlacklist(reporterName: String, config: Config):
Option[String] = {
- val metricsConfig = new MetricsConfig(config)
- val blacklist =
JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterBlacklist(reporterName)).toOption
- info("Got blacklist as: %s" format blacklist)
- blacklist
- }
-
- protected def getReportingInterval(reporterName: String, config: Config):
Int = {
- val metricsConfig = new MetricsConfig(config)
- val reportingInterval =
metricsConfig.getMetricsSnapshotReporterInterval(reporterName)
- info("Got reporting interval: %d" format reportingInterval)
- reportingInterval
- }
-
- protected def getJobId(config: Config): String = {
- val jobConfig = new JobConfig(config)
- jobConfig.getJobId
- }
-
- protected def getJobName(config: Config): String = {
- val jobConfig = new JobConfig(config)
- JavaOptionals.toRichOptional(jobConfig.getName).toOption
- .getOrElse(throw new SamzaException("Job name must be defined in
config."))
- }
-
-
- def getMetricsReporter(reporterName: String, containerName: String, config:
Config): MetricsReporter = {
- info("Creating new metrics snapshot reporter.")
- val registry = new MetricsRegistryMap
-
- val systemStream = getSystemStream(reporterName, config)
- val producer = getProducer(reporterName, config, registry)
- val reportingInterval = getReportingInterval(reporterName, config);
- val jobName = getJobName(config)
- val jobId = getJobId(config)
- val serde = getSerde(reporterName, config)
- val blacklist = getBlacklist(reporterName, config)
-
- val reporter = new MetricsSnapshotReporter(
- producer,
- systemStream,
- reportingInterval,
- jobName,
- jobId,
- containerName,
- Util.getTaskClassVersion(config),
- Util.getSamzaVersion,
- Util.getLocalHost.getHostName,
- serde, blacklist)
-
- reporter.register(this.getClass.getSimpleName, registry)
-
- reporter
- }
-}
diff --git
a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetrics.java
b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetrics.java
new file mode 100644
index 0000000..73cda80
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetrics.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metrics.reporter;
+
+import java.util.Map;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+
+public class TestMetrics {
+ @Test
+ public void testMetrics() {
+ Map<String, Object> group0Metrics = ImmutableMap.of("int-value", 10,
"boolean-value", true);
+ Map<String, Object> group1Metrics =
+ ImmutableMap.of("string-value", "str", "map-value",
ImmutableMap.of("a", "aa", "b", "bb"));
+ Map<String, Map<String, Object>> metricsMap = ImmutableMap.of("group0",
group0Metrics, "group1", group1Metrics);
+ Metrics metrics = new Metrics(metricsMap);
+ ImmutableMap<String, Object> group0MetricsCopy =
ImmutableMap.copyOf(group0Metrics);
+ ImmutableMap<String, Object> group1MetricsCopy =
ImmutableMap.copyOf(group1Metrics);
+ assertEquals(ImmutableMap.of("group0", group0MetricsCopy, "group1",
group1MetricsCopy), metrics.getAsMap());
+ assertEquals(group0MetricsCopy, metrics.get("group0"));
+ assertEquals(group1MetricsCopy, metrics.get("group1"));
+ assertNull(metrics.get("group2"));
+ assertEquals(new Integer(10), metrics.get("group0", "int-value"));
+ assertEquals(Boolean.TRUE, metrics.get("group0", "boolean-value"));
+ assertNull(metrics.get("group0", "key-does-not-exist"));
+ assertEquals("str", metrics.get("group1", "string-value"));
+ assertEquals(ImmutableMap.of("a", "aa", "b", "bb"), metrics.get("group1",
"map-value"));
+ }
+}
\ No newline at end of file
diff --git
a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsHeader.java
b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsHeader.java
new file mode 100644
index 0000000..bbd6b30
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsHeader.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metrics.reporter;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestMetricsHeader {
+ private static final String JOB_NAME = "job-a";
+ private static final String JOB_ID = "id-a";
+ private static final String CONTAINER_NAME = "samza-container-0";
+ private static final String EXEC_ENV_CONTAINER_ID = "container-12345";
+ private static final String SOURCE = "metrics-source";
+ private static final String VERSION = "1.2.3";
+ private static final String SAMZA_VERSION = "4.5.6";
+ private static final String HOST = "host0.a.b.c";
+ private static final long TIME = 100;
+ private static final long RESET_TIME = 10;
+
+ @Test
+ public void testGetAsMap() {
+ MetricsHeader metricsHeader =
+ new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME,
EXEC_ENV_CONTAINER_ID, SOURCE, VERSION, SAMZA_VERSION, HOST,
+ TIME, RESET_TIME);
+ Map<String, Object> expected = new HashMap<>();
+ expected.put("job-name", JOB_NAME);
+ expected.put("job-id", JOB_ID);
+ expected.put("container-name", CONTAINER_NAME);
+ expected.put("exec-env-container-id", EXEC_ENV_CONTAINER_ID);
+ expected.put("source", SOURCE);
+ expected.put("version", VERSION);
+ expected.put("samza-version", SAMZA_VERSION);
+ expected.put("host", HOST);
+ expected.put("time", TIME);
+ expected.put("reset-time", RESET_TIME);
+ assertEquals(expected, metricsHeader.getAsMap());
+ }
+
+ @Test
+ public void testFromMap() {
+ Map<String, Object> map = new HashMap<>();
+ map.put("job-name", JOB_NAME);
+ map.put("job-id", JOB_ID);
+ map.put("container-name", CONTAINER_NAME);
+ map.put("exec-env-container-id", EXEC_ENV_CONTAINER_ID);
+ map.put("source", SOURCE);
+ map.put("version", VERSION);
+ map.put("samza-version", SAMZA_VERSION);
+ map.put("host", HOST);
+ map.put("time", TIME);
+ map.put("reset-time", RESET_TIME);
+ MetricsHeader expected =
+ new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME,
EXEC_ENV_CONTAINER_ID, SOURCE, VERSION, SAMZA_VERSION, HOST,
+ TIME, RESET_TIME);
+ assertEquals(expected, MetricsHeader.fromMap(map));
+ }
+}
diff --git
a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshot.java
b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshot.java
new file mode 100644
index 0000000..9b5c693
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshot.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metrics.reporter;
+
+import java.util.Map;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestMetricsSnapshot {
+ private static final MetricsHeader METRICS_HEADER =
+ new MetricsHeader("job", "id", "container", "container-id", "source",
"1.2", "3.4", "a.b.c", 100, 10);
+ private static final Metrics METRICS =
+ new Metrics(ImmutableMap.of("group0", ImmutableMap.of("a", "b"),
"group1", ImmutableMap.of("c", "d")));
+
+ @Test
+ public void testGetAsMap() {
+ MetricsSnapshot metricsSnapshot = new MetricsSnapshot(METRICS_HEADER,
METRICS);
+ Map<String, Object> expected = ImmutableMap.of("header",
METRICS_HEADER.getAsMap(), "metrics", METRICS.getAsMap());
+ assertEquals(expected, metricsSnapshot.getAsMap());
+ }
+
+ @Test
+ public void testFromMap() {
+ Map<String, Map<String, Object>> map =
+ ImmutableMap.of("header",
ImmutableMap.copyOf(METRICS_HEADER.getAsMap()), "metrics",
+ ImmutableMap.copyOf((Map<String, Object>) (Map<?, ?>)
METRICS.getAsMap()));
+ MetricsSnapshot expected = new MetricsSnapshot(METRICS_HEADER, METRICS);
+ assertEquals(expected, MetricsSnapshot.fromMap(map));
+ }
+}
\ No newline at end of file
diff --git
a/samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java
b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshotReporter.java
similarity index 86%
rename from
samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java
rename to
samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshotReporter.java
index 1f69a7e..c761486 100644
---
a/samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java
+++
b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshotReporter.java
@@ -17,28 +17,29 @@
* under the License.
*/
-package org.apache.samza.metrics;
+package org.apache.samza.metrics.reporter;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
-import org.apache.samza.metrics.reporter.MetricsSnapshot;
-import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
import org.apache.samza.serializers.Serializer;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.SystemClock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-import scala.Some;
-import scala.runtime.AbstractFunction0;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.verify;
public class TestMetricsSnapshotReporter {
@@ -56,7 +57,7 @@ public class TestMetricsSnapshotReporter {
private static final String TASK_VERSION = "test version";
private static final String SAMZA_VERSION = "test samza version";
private static final String HOSTNAME = "test host";
- private static final int REPORTING_INTERVAL = 60000;
+ private static final Duration REPORTING_INTERVAL = Duration.ofSeconds(60000);
private Serializer<MetricsSnapshot> serializer;
private SystemProducer producer;
@@ -69,7 +70,7 @@ public class TestMetricsSnapshotReporter {
@Test
public void testBlacklistAll() {
- this.metricsSnapshotReporter = getMetricsSnapshotReporter(BLACKLIST_ALL);
+ this.metricsSnapshotReporter =
getMetricsSnapshotReporter(Optional.of(BLACKLIST_ALL));
Assert.assertTrue("Should ignore all metrics",
this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.system.kafka.KafkaSystemProducerMetrics",
@@ -84,7 +85,7 @@ public class TestMetricsSnapshotReporter {
@Test
public void testBlacklistNone() {
- this.metricsSnapshotReporter = getMetricsSnapshotReporter(BLACKLIST_NONE);
+ this.metricsSnapshotReporter =
getMetricsSnapshotReporter(Optional.of(BLACKLIST_NONE));
Assert.assertFalse("Should not ignore any metrics",
this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.system.kafka.KafkaSystemProducerMetrics",
@@ -99,7 +100,7 @@ public class TestMetricsSnapshotReporter {
@Test
public void testBlacklistGroup() {
- this.metricsSnapshotReporter =
getMetricsSnapshotReporter(BLACKLIST_GROUPS);
+ this.metricsSnapshotReporter =
getMetricsSnapshotReporter(Optional.of(BLACKLIST_GROUPS));
Assert.assertTrue("Should ignore all metrics from this group",
this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.system.SystemConsumersMetrics",
"poll-ns"));
@@ -118,7 +119,7 @@ public class TestMetricsSnapshotReporter {
@Test
public void testBlacklistAllButTwoGroups() {
- this.metricsSnapshotReporter =
getMetricsSnapshotReporter(BLACKLIST_ALL_BUT_TWO_GROUPS);
+ this.metricsSnapshotReporter =
getMetricsSnapshotReporter(Optional.of(BLACKLIST_ALL_BUT_TWO_GROUPS));
Assert.assertFalse("Should not ignore this group",
this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.system.SystemConsumersMetrics",
"poll-ns"));
@@ -141,7 +142,7 @@ public class TestMetricsSnapshotReporter {
String metricName = "someName";
MetricsRegistryMap registry = new MetricsRegistryMap();
- metricsSnapshotReporter =
getMetricsSnapshotReporter(TestMetricsSnapshotReporter.BLACKLIST_NONE);
+ metricsSnapshotReporter = getMetricsSnapshotReporter(Optional.empty());
registry.newGauge(group, metricName, 42);
metricsSnapshotReporter.register(source, registry);
@@ -176,17 +177,8 @@ public class TestMetricsSnapshotReporter {
Assert.assertEquals(42, metricMap.get(group).get(metricName));
}
- private MetricsSnapshotReporter getMetricsSnapshotReporter(String blacklist)
{
+ private MetricsSnapshotReporter getMetricsSnapshotReporter(Optional<String>
blacklist) {
return new MetricsSnapshotReporter(producer, SYSTEM_STREAM,
REPORTING_INTERVAL, JOB_NAME, JOB_ID, CONTAINER_NAME,
- TASK_VERSION, SAMZA_VERSION, HOSTNAME, serializer, new
Some<>(blacklist), getClock());
- }
-
- private AbstractFunction0<Object> getClock() {
- return new AbstractFunction0<Object>() {
- @Override
- public Object apply() {
- return System.currentTimeMillis();
- }
- };
+ TASK_VERSION, SAMZA_VERSION, HOSTNAME, serializer,
blacklist.map(Pattern::compile), SystemClock.instance());
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshotReporterFactory.java
b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshotReporterFactory.java
new file mode 100644
index 0000000..e7239e1
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshotReporterFactory.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metrics.reporter;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+
+public class TestMetricsSnapshotReporterFactory {
+ private static final String REPORTER = "metrics-reporter";
+ private static final SystemProducer SYSTEM_PRODUCER =
mock(SystemProducer.class);
+ private static final Serde<MetricsSnapshot> SERDE = mock(Serde.class);
+
+ private MetricsSnapshotReporterFactory factory;
+
+ @Before
+ public void setup() {
+ this.factory = new MetricsSnapshotReporterFactory();
+ }
+
+ @Test
+ public void testGetProducer() {
+ Config config = new MapConfig(
+ ImmutableMap.of("metrics.reporter.metrics-reporter.stream",
"system0.stream0", "systems.system0.samza.factory",
+ MockSystemFactory.class.getName()));
+ assertEquals(SYSTEM_PRODUCER, this.factory.getProducer(REPORTER, config,
new MetricsRegistryMap()));
+ }
+
+ @Test
+ public void testGetSystemStream() {
+ Config config = new
MapConfig(ImmutableMap.of("metrics.reporter.metrics-reporter.stream",
"system0.stream0"));
+ assertEquals(new SystemStream("system0", "stream0"),
this.factory.getSystemStream(REPORTER, config));
+ }
+
+ @Test
+ public void testGetSerdeConfigured() {
+ Config config = new MapConfig(
+ ImmutableMap.of("metrics.reporter.metrics-reporter.stream",
"system0.stream0", "streams.stream0.samza.system",
+ "system0", "streams.stream0.samza.msg.serde", "snapshot-serde",
"serializers.registry.snapshot-serde.class",
+ MockSerdeFactory.class.getName()));
+ assertEquals(SERDE, this.factory.getSerde(REPORTER, config));
+ }
+
+ @Test
+ public void testGetSerdeNoSerdeFactory() {
+ Config config = new MapConfig(
+ ImmutableMap.of("metrics.reporter.metrics-reporter.stream",
"system0.stream0", "streams.stream0.samza.system",
+ "system0", "streams.stream0.samza.msg.serde", "snapshot-serde"));
+ assertNull(this.factory.getSerde(REPORTER, config));
+ }
+
+ @Test
+ public void testGetSerdeFallback() {
+ Config config = new MapConfig(
+ ImmutableMap.of("metrics.reporter.metrics-reporter.stream",
"system0.stream0", "streams.stream0.samza.system",
+ "system0"));
+ assertTrue(this.factory.getSerde(REPORTER, config) instanceof
MetricsSnapshotSerdeV2);
+ }
+
+ public static class MockSystemFactory implements SystemFactory {
+ @Override
+ public SystemConsumer getConsumer(String systemName, Config config,
MetricsRegistry registry) {
+ throw new UnsupportedOperationException("Unnecessary for test");
+ }
+
+ @Override
+ public SystemProducer getProducer(String systemName, Config config,
MetricsRegistry registry) {
+ return SYSTEM_PRODUCER;
+ }
+
+ @Override
+ public SystemAdmin getAdmin(String systemName, Config config) {
+ throw new UnsupportedOperationException("Unnecessary for test");
+ }
+ }
+
+ public static class MockSerdeFactory implements
SerdeFactory<MetricsSnapshot> {
+ @Override
+ public Serde<MetricsSnapshot> getSerde(String name, Config config) {
+ return SERDE;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/samza-core/src/test/java/org/apache/samza/serializers/TestMetricsSnapshotSerde.java
b/samza-core/src/test/java/org/apache/samza/serializers/TestMetricsSnapshotSerde.java
new file mode 100644
index 0000000..b75cbde
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/serializers/TestMetricsSnapshotSerde.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.serializers;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.metrics.reporter.Metrics;
+import org.apache.samza.metrics.reporter.MetricsHeader;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+
+public class TestMetricsSnapshotSerde {
+ private static final String SERIALIZED =
+ "{\"header\":{\"job-id\":\"testjobid\",\"exec-env-container-id\":\"test
exec env container
id\",\"samza-version\":\"samzaversion\",\"job-name\":\"test-jobName\",\"host\":\"host\",\"reset-time\":2,\"container-name\":\"samza-container-0\",\"source\":\"test
source\",\"time\":1,\"version\":\"version\"},\"metrics\":{\"test\":{\"test2\":\"foo\"}}}";
+
+ @Test
+ public void testMetricsSerdeSerializeAndDeserialize() {
+ Map<String, Object> metricsMap = new HashMap<>();
+ metricsMap.put("test2", "foo");
+ Map<String, Map<String, Object>> metricsGroupMap = new HashMap<>();
+ metricsGroupMap.put("test", metricsMap);
+ MetricsSnapshot snapshot = new MetricsSnapshot(metricsHeader(),
Metrics.fromMap(metricsGroupMap));
+ MetricsSnapshotSerde serde = new MetricsSnapshotSerde();
+ byte[] bytes = serde.toBytes(snapshot);
+ assertEquals(snapshot, serde.fromBytes(bytes));
+ }
+
+ /**
+ * Helps for testing compatibility against older versions of code.
+ */
+ @Test
+ public void testMetricsSerdeSerialize() {
+ Map<String, Object> metricsMap = new HashMap<>();
+ metricsMap.put("test2", "foo");
+ Map<String, Map<String, Object>> metricsGroupMap = new HashMap<>();
+ metricsGroupMap.put("test", metricsMap);
+ MetricsSnapshot snapshot = new MetricsSnapshot(metricsHeader(),
Metrics.fromMap(metricsGroupMap));
+ MetricsSnapshotSerde serde = new MetricsSnapshotSerde();
+ assertArrayEquals(SERIALIZED.getBytes(StandardCharsets.UTF_8),
serde.toBytes(snapshot));
+ }
+
+ /**
+ * Helps for testing compatibility against older versions of code.
+ */
+ @Test
+ public void testMetricsSerdeDeserialize() {
+ Map<String, Object> metricsMap = new HashMap<>();
+ metricsMap.put("test2", "foo");
+ Map<String, Map<String, Object>> metricsGroupMap = new HashMap<>();
+ metricsGroupMap.put("test", metricsMap);
+ MetricsSnapshot snapshot = new MetricsSnapshot(metricsHeader(),
Metrics.fromMap(metricsGroupMap));
+ MetricsSnapshotSerde serde = new MetricsSnapshotSerde();
+ assertEquals(snapshot,
serde.fromBytes(SERIALIZED.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ private static MetricsHeader metricsHeader() {
+ return new MetricsHeader("test-jobName", "testjobid", "samza-container-0",
"test exec env container id",
+ "test source", "version", "samzaversion", "host", 1L, 2L);
+ }
+}
diff --git
a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
index b0b65e0..6a3dc21 100644
---
a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
+++
b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
@@ -19,51 +19,77 @@
package org.apache.samza.serializers.model.serializers;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.samza.SamzaException;
-import org.apache.samza.diagnostics.DiagnosticsExceptionEvent;
import org.apache.samza.diagnostics.BoundedList;
+import org.apache.samza.diagnostics.DiagnosticsExceptionEvent;
import org.apache.samza.metrics.reporter.Metrics;
import org.apache.samza.metrics.reporter.MetricsHeader;
import org.apache.samza.metrics.reporter.MetricsSnapshot;
import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
-import org.junit.Assert;
import org.junit.Test;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
-public class TestMetricsSnapshotSerdeV2 {
+public class TestMetricsSnapshotSerdeV2 {
@Test
- public void testSerde() {
- MetricsHeader metricsHeader =
- new MetricsHeader("jobName", "i001", "container 0", "test container
ID", "source", "300.14.25.1", "1", "1", 1,
- 1);
-
- BoundedList boundedList = new
BoundedList<DiagnosticsExceptionEvent>("exceptions");
- DiagnosticsExceptionEvent diagnosticsExceptionEvent1 =
- new DiagnosticsExceptionEvent(1, new SamzaException("this is a samza
exception", new RuntimeException("cause")),
- new HashMap());
-
- boundedList.add(diagnosticsExceptionEvent1);
-
- String samzaContainerMetricsGroupName =
"org.apache.samza.container.SamzaContainerMetrics";
- Map<String, Map<String, Object>> metricMessage = new HashMap<>();
- metricMessage.put(samzaContainerMetricsGroupName, new HashMap<>());
- metricMessage.get(samzaContainerMetricsGroupName).put("exceptions",
boundedList.getValues());
- metricMessage.get(samzaContainerMetricsGroupName).put("commit-calls", 0);
-
- MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new
Metrics(metricMessage));
+ public void testSerializeAndDeserialize() {
+ SamzaException samzaException = new SamzaException("this is a samza
exception", new RuntimeException("cause"));
+ MetricsSnapshot metricsSnapshot = metricsSnapshot(samzaException);
+ MetricsSnapshotSerdeV2 metricsSnapshotSerde = new MetricsSnapshotSerdeV2();
+ byte[] serializedBytes = metricsSnapshotSerde.toBytes(metricsSnapshot);
+ MetricsSnapshot deserializedMetricsSnapshot =
metricsSnapshotSerde.fromBytes(serializedBytes);
+ assertEquals(metricsSnapshot, deserializedMetricsSnapshot);
+ }
+ @Test
+ public void testSerialize() {
+ SamzaException samzaException = new SamzaException("this is a samza
exception", new RuntimeException("cause"));
+ MetricsSnapshot metricsSnapshot = metricsSnapshot(samzaException);
MetricsSnapshotSerdeV2 metricsSnapshotSerde = new MetricsSnapshotSerdeV2();
byte[] serializedBytes = metricsSnapshotSerde.toBytes(metricsSnapshot);
+
assertArrayEquals(expectedSeralizedSnapshot(samzaException).getBytes(StandardCharsets.UTF_8),
serializedBytes);
+ }
- MetricsSnapshot deserializedMetricsSnapshot =
metricsSnapshotSerde.fromBytes(serializedBytes);
+ @Test
+ public void testDeserialize() {
+ SamzaException samzaException = new SamzaException("this is a samza
exception", new RuntimeException("cause"));
+ MetricsSnapshot expectedSnapshot = metricsSnapshot(samzaException);
+ MetricsSnapshotSerdeV2 metricsSnapshotSerde = new MetricsSnapshotSerdeV2();
+ MetricsSnapshot deserializedSnapshot =
+
metricsSnapshotSerde.fromBytes(expectedSeralizedSnapshot(samzaException).getBytes(StandardCharsets.UTF_8));
+ assertEquals(expectedSnapshot, deserializedSnapshot);
+ }
- Assert.assertTrue("Headers map should be equal",
-
metricsSnapshot.getHeader().getAsMap().equals(deserializedMetricsSnapshot.getHeader().getAsMap()));
+ private static MetricsSnapshot metricsSnapshot(Exception exception) {
+ MetricsHeader metricsHeader =
+ new MetricsHeader("jobName", "i001", "container 0", "test container
ID", "source", "300.14.25.1", "1", "1", 1,
+ 1);
+ BoundedList<DiagnosticsExceptionEvent> boundedList = new
BoundedList<>("exceptions");
+ DiagnosticsExceptionEvent diagnosticsExceptionEvent = new
DiagnosticsExceptionEvent(1, exception, new HashMap<>());
+ boundedList.add(diagnosticsExceptionEvent);
+ Map<String, Map<String, Object>> metricMessage = new HashMap<>();
+ Map<String, Object> samzaContainerMetrics = new HashMap<>();
+ samzaContainerMetrics.put("commit-calls", 0);
+ metricMessage.put("org.apache.samza.container.SamzaContainerMetrics",
samzaContainerMetrics);
+ Map<String, Object> exceptions = new HashMap<>();
+ exceptions.put("exceptions", boundedList.getValues());
+ metricMessage.put("org.apache.samza.exceptions", exceptions);
+ return new MetricsSnapshot(metricsHeader, new Metrics(metricMessage));
+ }
- Assert.assertTrue("Metrics map should be equal",
-
metricsSnapshot.getMetrics().getAsMap().equals(deserializedMetricsSnapshot.getMetrics().getAsMap()));
+ private static String expectedSeralizedSnapshot(Exception exception) {
+ String stackTrace = ExceptionUtils.getStackTrace(exception);
+ // in serialized string, backslash in whitespace characters (e.g. \n, \t)
are escaped
+ String escapedStackTrace = stackTrace.replace("\n", "\\n").replace("\t",
"\\t");
+ return
+
"{\"header\":[\"java.util.HashMap\",{\"job-id\":\"i001\",\"exec-env-container-id\":\"test
container
ID\",\"samza-version\":\"1\",\"job-name\":\"jobName\",\"host\":\"1\",\"reset-time\":[\"java.lang.Long\",1],\"container-name\":\"container
0\",\"source\":\"source\",\"time\":[\"java.lang.Long\",1],\"version\":\"300.14.25.1\"}],\"metrics\":[\"java.util.HashMap\",{\"org.apache.samza.exceptions\":[\"java.util.HashMap\",{\"exceptions\":[\"java.util.Collections$UnmodifiableRandomAccessLi
[...]
+ + escapedStackTrace
+ +
"\",\"mdcMap\":[\"java.util.HashMap\",{}]}]]]}],\"org.apache.samza.container.SamzaContainerMetrics\":[\"java.util.HashMap\",{\"commit-calls\":0}]}]}";
}
}
diff --git
a/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala
b/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala
deleted file mode 100644
index 360e6fa..0000000
---
a/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.HashMap
-import java.util.Map
-
-import org.apache.samza.config.ShellCommandConfig
-import org.apache.samza.metrics.reporter.MetricsSnapshot
-import org.apache.samza.metrics.reporter.MetricsHeader
-import org.apache.samza.metrics.reporter.Metrics
-import org.junit.Assert._
-import org.junit.Ignore
-import org.junit.Test
-
-class TestMetricsSnapshotSerde {
- @Ignore
- @Test
- def testMetricsSerdeShouldSerializeAndDeserializeAMetric {
- val header = new MetricsHeader("test-jobName", "testjobid",
"samza-container-0", "test exec env container id", "test source", "version",
"samzaversion", "host", 1L, 2L)
- val metricsMap = new HashMap[String, Object]()
- metricsMap.put("test2", "foo")
- val metricsGroupMap = new HashMap[String, Map[String, Object]]()
- metricsGroupMap.put("test", metricsMap)
- val metrics = Metrics.fromMap(metricsGroupMap)
- val snapshot = new MetricsSnapshot(header, metrics)
- val serde = new MetricsSnapshotSerde()
- val bytes = serde.toBytes(snapshot)
- assertTrue(serde.fromBytes(bytes).equals(metrics))
- }
-}