This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4f1504fb60b Basic Opeartor Metric Support For Data Shuffle (GBK and
Combine Per Key) Operators for Samza Runner (#26649)
4f1504fb60b is described below
commit 4f1504fb60bde3685eb07852ad852445c9b8078d
Author: Sanil Jain <[email protected]>
AuthorDate: Thu May 18 17:00:59 2023 -0700
Basic Opeartor Metric Support For Data Shuffle (GBK and Combine Per Key)
Operators for Samza Runner (#26649)
---
.../runners/samza/metrics/SamzaGBKMetricOp.java | 194 +++++++++++++++++++++
.../runners/samza/metrics/SamzaInputMetricOp.java | 133 --------------
...SamzaOutputMetricOp.java => SamzaMetricOp.java} | 73 +++++---
.../samza/metrics/SamzaMetricOpFactory.java | 18 +-
.../metrics/SamzaTransformMetricRegistry.java | 66 +++++++
.../samza/metrics/SamzaTransformMetrics.java | 18 ++
.../samza/translation/TranslationContext.java | 20 ++-
.../TestSamzaRunnerWithTransformMetrics.java | 125 ++++++++++++-
.../metrics/TestSamzaTransformMetricsRegistry.java | 191 ++++++++++++++++++++
9 files changed, 656 insertions(+), 182 deletions(-)
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaGBKMetricOp.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaGBKMetricOp.java
new file mode 100644
index 00000000000..897c4a05b15
--- /dev/null
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaGBKMetricOp.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.runners.samza.runtime.KeyedTimerData;
+import org.apache.beam.runners.samza.runtime.Op;
+import org.apache.beam.runners.samza.runtime.OpEmitter;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SamzaGBKMetricOp is a {@link Op} that emits & maintains default metrics for
input or output
+ * PCollection for GroupByKey.
+ *
+ * <p>For Input PCollection: It emits the input throughput and maintains avg
input time for input
+ * PCollection per windowId.
+ *
+ * <p>For Output PCollection: It emits the output throughput and maintains avg
output time for
+ * output PCollection per windowId. It is also responsible for emitting
latency metric per windowId
+ * once the watermark passes the end of window timestamp.
+ *
+ * <p>Assumes that {@code SamzaGBKMetricOp#processWatermark(Instant,
OpEmitter)} is exclusive of
+ * {@code SamzaGBKMetricOp#processElement(Instant, OpEmitter)}. Specifically,
the processWatermark
+ * method assumes that no calls to processElement will be made during its
execution, and vice versa.
+ *
+ * @param <T> The type of the elements in the input PCollection.
+ */
+class SamzaGBKMetricOp<T> implements Op<T, T, Void> {
+ private static final Logger LOG =
LoggerFactory.getLogger(SamzaGBKMetricOp.class);
+ // Unique name of the PTransform this MetricOp is associated with
+ private final String transformFullName;
+ private final SamzaTransformMetricRegistry samzaTransformMetricRegistry;
+ // Type of the processing operation
+ private final SamzaMetricOpFactory.OpType opType;
+
+ private final String pValue;
+ // Counters for keeping sum of arrival time and count of elements per
windowId
+ @SuppressFBWarnings("SE_BAD_FIELD")
+ private final ConcurrentHashMap<BoundedWindow, BigInteger>
sumOfTimestampsPerWindowId;
+
+ @SuppressFBWarnings("SE_BAD_FIELD")
+ private final ConcurrentHashMap<BoundedWindow, Long> sumOfCountPerWindowId;
+ // Name of the task, for logging purpose
+ private transient String task;
+
+ @Override
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void open(
+ Config config,
+ Context context,
+ Scheduler<KeyedTimerData<Void>> timerRegistry,
+ OpEmitter<T> emitter) {
+ // for logging / debugging purposes
+ this.task =
context.getTaskContext().getTaskModel().getTaskName().getTaskName();
+ // Register the transform with SamzaTransformMetricRegistry
+ samzaTransformMetricRegistry.register(transformFullName, pValue, context);
+ }
+
+ // Some fields are initialized in open() method, which is called after the
constructor.
+ @SuppressWarnings("initialization.fields.uninitialized")
+ public SamzaGBKMetricOp(
+ String pValue,
+ String transformFullName,
+ SamzaMetricOpFactory.OpType opType,
+ SamzaTransformMetricRegistry samzaTransformMetricRegistry) {
+ this.pValue = pValue;
+ this.transformFullName = transformFullName;
+ this.opType = opType;
+ this.samzaTransformMetricRegistry = samzaTransformMetricRegistry;
+ this.sumOfTimestampsPerWindowId = new ConcurrentHashMap<>();
+ this.sumOfCountPerWindowId = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void processElement(WindowedValue<T> inputElement, OpEmitter<T>
emitter) {
+ // one element can belong to multiple windows
+ for (BoundedWindow windowId : inputElement.getWindows()) {
+ // Atomic updates to counts
+ sumOfCountPerWindowId.compute(
+ windowId,
+ (key, value) -> {
+ value = value == null ? Long.valueOf(0) : value;
+ return ++value;
+ });
+ // Atomic updates to sum of arrival timestamps
+ sumOfTimestampsPerWindowId.compute(
+ windowId,
+ (key, value) -> {
+ value = value == null ? BigInteger.ZERO : value;
+ return value.add(BigInteger.valueOf(System.nanoTime()));
+ });
+ }
+
+ switch (opType) {
+ case INPUT:
+ samzaTransformMetricRegistry
+ .getTransformMetrics()
+ .getTransformInputThroughput(transformFullName)
+ .inc();
+ break;
+ case OUTPUT:
+ samzaTransformMetricRegistry
+ .getTransformMetrics()
+ .getTransformOutputThroughput(transformFullName)
+ .inc();
+ break;
+ }
+ emitter.emitElement(inputElement);
+ }
+
+ @Override
+ public void processWatermark(Instant watermark, OpEmitter<T> emitter) {
+ final List<BoundedWindow> closedWindows = new ArrayList<>();
+ sumOfCountPerWindowId.keySet().stream()
+ .filter(windowId -> watermark.isAfter(windowId.maxTimestamp())) //
window is closed
+ .forEach(
+ windowId -> {
+ // In case if BigInteger overflows for long we only retain the
last 64 bits of the sum
+ long sumOfTimestamps =
+ sumOfTimestampsPerWindowId.get(windowId) != null
+ ? sumOfTimestampsPerWindowId.get(windowId).longValue()
+ : 0L;
+ long count = sumOfCountPerWindowId.get(windowId);
+ closedWindows.add(windowId);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Processing {} Watermark for Transform: {}, WindowId:{},
count: {}, sumOfTimestamps: {}, task: {}",
+ opType,
+ transformFullName,
+ windowId,
+ count,
+ sumOfTimestamps,
+ task);
+ }
+
+ // if the window is closed and there is some data
+ if (sumOfTimestamps > 0 && count > 0) {
+ switch (opType) {
+ case INPUT:
+ // Update the arrival time for the window
+ samzaTransformMetricRegistry.updateArrivalTimeMap(
+ transformFullName, windowId,
Math.floorDiv(sumOfTimestamps, count));
+ break;
+ case OUTPUT:
+ // Compute the latency if there is some data for the window
+ samzaTransformMetricRegistry.emitLatencyMetric(
+ transformFullName, windowId,
Math.floorDiv(sumOfTimestamps, count), task);
+ break;
+ }
+ }
+ });
+
+ // remove the closed windows
+ sumOfCountPerWindowId.keySet().removeAll(closedWindows);
+ sumOfTimestampsPerWindowId.keySet().removeAll(closedWindows);
+
+ // Update the watermark progress for the transform output
+ if (opType == SamzaMetricOpFactory.OpType.OUTPUT) {
+ samzaTransformMetricRegistry
+ .getTransformMetrics()
+ .getTransformWatermarkProgress(transformFullName)
+ .set(watermark.getMillis());
+ }
+
+ emitter.emitWatermark(watermark);
+ }
+}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaInputMetricOp.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaInputMetricOp.java
deleted file mode 100644
index 88360c689f2..00000000000
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaInputMetricOp.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.samza.metrics;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.beam.runners.samza.runtime.KeyedTimerData;
-import org.apache.beam.runners.samza.runtime.Op;
-import org.apache.beam.runners.samza.runtime.OpEmitter;
-import org.apache.beam.runners.samza.util.PipelineJsonRenderer;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.samza.config.Config;
-import org.apache.samza.context.Context;
-import org.apache.samza.operators.Scheduler;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * SamzaInputMetricOp emits & maintains default transform metrics for input
PCollection to the
- * transform. It emits the input throughput and maintains avg arrival time for
input PCollection per
- * watermark.
- *
- * <p>Assumes that {@code SamzaInputMetricOp#processWatermark(Instant,
OpEmitter)} is exclusive of
- * {@code SamzaInputMetricOp#processElement(Instant, OpEmitter)}.
Specifically, the processWatermark
- * method assumes that no calls to processElement will be made during its
execution, and vice versa.
- *
- * @param <T> The type of the elements in the input PCollection.
- */
-class SamzaInputMetricOp<T> implements Op<T, T, Void> {
- private static final Logger LOG =
LoggerFactory.getLogger(SamzaInputMetricOp.class);
-
- // Unique name of the PTransform this MetricOp is associated with
- protected final String transformFullName;
- protected final SamzaTransformMetricRegistry samzaTransformMetricRegistry;
- // Name or identifier of the PCollection which PTransform is processing
- protected final String pValue;
- // Counters to maintain avg arrival time per watermark for input PCollection.
- private final AtomicLong count;
- private final AtomicReference<BigInteger> sumOfTimestamps;
- // List of input PValue(s) for all PCollections processing the PTransform
- protected transient List<String> transformInputs;
- // List of output PValue(s) for all PCollections processing the PTransform
- protected transient List<String> transformOutputs;
- // Name of the task, for logging purpose
- protected transient String task;
-
- // Some fields are initialized in open() method, which is called after the
constructor.
- @SuppressWarnings("initialization.fields.uninitialized")
- public SamzaInputMetricOp(
- String pValue,
- String transformFullName,
- SamzaTransformMetricRegistry samzaTransformMetricRegistry) {
- this.transformFullName = transformFullName;
- this.samzaTransformMetricRegistry = samzaTransformMetricRegistry;
- this.pValue = pValue;
- this.count = new AtomicLong(0L);
- this.sumOfTimestamps = new AtomicReference<>(BigInteger.ZERO);
- }
-
- @Override
- @SuppressWarnings({"rawtypes", "unchecked"})
- public void open(
- Config config,
- Context context,
- Scheduler<KeyedTimerData<Void>> timerRegistry,
- OpEmitter<T> emitter) {
- final Map.Entry<List<String>, List<String>> transformInputOutput =
- PipelineJsonRenderer.getTransformIOMap(config).get(transformFullName);
- this.transformInputs =
- transformInputOutput != null ? transformInputOutput.getKey() : new
ArrayList();
- this.transformOutputs =
- transformInputOutput != null ? transformInputOutput.getValue() : new
ArrayList();
- // for logging / debugging purposes
- this.task =
context.getTaskContext().getTaskModel().getTaskName().getTaskName();
- // Register the transform with SamzaTransformMetricRegistry
- samzaTransformMetricRegistry.register(transformFullName, pValue, context);
- }
-
- @Override
- public void processElement(WindowedValue<T> inputElement, OpEmitter<T>
emitter) {
- count.incrementAndGet();
- sumOfTimestamps.updateAndGet(sum ->
sum.add(BigInteger.valueOf(System.nanoTime())));
- samzaTransformMetricRegistry
- .getTransformMetrics()
- .getTransformInputThroughput(transformFullName)
- .inc();
- emitter.emitElement(inputElement);
- }
-
- @Override
- public void processWatermark(Instant watermark, OpEmitter<T> emitter) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Processing Input Watermark for Transform: {} Count: {}
SumOfTimestamps: {} for Watermark: {} for Task: {}",
- transformFullName,
- count.get(),
- sumOfTimestamps.get().longValue(),
- watermark.getMillis(),
- task);
- }
- // if there is no input data then counters will be zero and only watermark
will progress
- if (count.get() > 0) {
- // if BigInt.longValue is out of range for long then only the low-order
64 bits are retained
- long avg = Math.floorDiv(sumOfTimestamps.get().longValue(), count.get());
- samzaTransformMetricRegistry.updateArrivalTimeMap(
- transformFullName, pValue, watermark.getMillis(), avg);
- }
- // reset all counters
- count.set(0L);
- sumOfTimestamps.set(BigInteger.ZERO);
- emitter.emitWatermark(watermark);
- }
-}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaOutputMetricOp.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricOp.java
similarity index 72%
rename from
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaOutputMetricOp.java
rename to
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricOp.java
index 1d34ff4c703..b624006fd98 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaOutputMetricOp.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricOp.java
@@ -38,44 +38,47 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * SamzaOutputMetricOp is a metric Op that emits & maintains default transform
metrics for output
- * PCollection to the transform. It emits the output throughput and maintains
avg arrival time for
- * output PCollection per watermark.
+ * SamzaMetricOp is a metric Op that emits & maintains default transform
metrics for inputs &
+ * outputs PCollection to the non data-shuffle transform. It emits the output
throughput and
+ * maintains avg arrival time for input & output PCollection per watermark.
*
- * <p>Assumes that {@code SamzaOutputMetricOp#processWatermark(Instant,
OpEmitter)} is exclusive of
- * {@code SamzaOutputMetricOp#processElement(Instant, OpEmitter)}.
Specifically, the
- * processWatermark method assumes that no calls to processElement will be
made during its
- * execution, and vice versa.
+ * <p>Assumes that {@code SamzaMetricOp#processWatermark(Instant, OpEmitter)}
is exclusive of {@code
+ * SamzaMetricOp#processElement(Instant, OpEmitter)}. Specifically, the
processWatermark method
+ * assumes that no calls to processElement will be made during its execution,
and vice versa.
*
* @param <T> The type of the elements in the output PCollection.
*/
-class SamzaOutputMetricOp<T> implements Op<T, T, Void> {
+class SamzaMetricOp<T> implements Op<T, T, Void> {
// Unique name of the PTransform this MetricOp is associated with
- protected final String transformFullName;
- protected final SamzaTransformMetricRegistry samzaTransformMetricRegistry;
+ private final String transformFullName;
+ private final SamzaTransformMetricRegistry samzaTransformMetricRegistry;
// Name or identifier of the PCollection which PTransform is processing
- protected final String pValue;
+ private final String pValue;
// Counters for output throughput
private final AtomicLong count;
private final AtomicReference<BigInteger> sumOfTimestamps;
+ // Type of the PTransform input or output
+ private final SamzaMetricOpFactory.OpType opType;
// List of input PValue(s) for all PCollections processing the PTransform
- protected transient List<String> transformInputs;
+ private transient List<String> transformInputs;
// List of output PValue(s) for all PCollections processing the PTransform
- protected transient List<String> transformOutputs;
+ private transient List<String> transformOutputs;
// Name of the task, for logging purpose
- protected transient String task;
+ private transient String task;
- private static final Logger LOG =
LoggerFactory.getLogger(SamzaOutputMetricOp.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(SamzaMetricOp.class);
// Some fields are initialized in open() method, which is called after the
constructor.
@SuppressWarnings("initialization.fields.uninitialized")
- public SamzaOutputMetricOp(
+ public SamzaMetricOp(
@NonNull String pValue,
@NonNull String transformFullName,
+ SamzaMetricOpFactory.OpType opType,
@NonNull SamzaTransformMetricRegistry samzaTransformMetricRegistry) {
this.transformFullName = transformFullName;
this.samzaTransformMetricRegistry = samzaTransformMetricRegistry;
this.pValue = pValue;
+ this.opType = opType;
this.count = new AtomicLong(0L);
this.sumOfTimestamps = new AtomicReference<>(BigInteger.ZERO);
}
@@ -104,10 +107,20 @@ class SamzaOutputMetricOp<T> implements Op<T, T, Void> {
// update counters for timestamps
count.incrementAndGet();
sumOfTimestamps.updateAndGet(sum ->
sum.add(BigInteger.valueOf(System.nanoTime())));
- samzaTransformMetricRegistry
- .getTransformMetrics()
- .getTransformOutputThroughput(transformFullName)
- .inc();
+ switch (opType) {
+ case INPUT:
+ samzaTransformMetricRegistry
+ .getTransformMetrics()
+ .getTransformInputThroughput(transformFullName)
+ .inc();
+ break;
+ case OUTPUT:
+ samzaTransformMetricRegistry
+ .getTransformMetrics()
+ .getTransformOutputThroughput(transformFullName)
+ .inc();
+ break;
+ }
emitter.emitElement(inputElement);
}
@@ -131,16 +144,20 @@ class SamzaOutputMetricOp<T> implements Op<T, T, Void> {
// Update MetricOp Registry with avg arrival for the pValue
samzaTransformMetricRegistry.updateArrivalTimeMap(
transformFullName, pValue, watermark.getMillis(), avg);
- // compute & emit the latency metric
- samzaTransformMetricRegistry.emitLatencyMetric(
- transformFullName, transformInputs, transformOutputs,
watermark.getMillis(), task);
+ if (opType == SamzaMetricOpFactory.OpType.OUTPUT) {
+ // compute & emit the latency metric if the opType is OUTPUT
+ samzaTransformMetricRegistry.emitLatencyMetric(
+ transformFullName, transformInputs, transformOutputs,
watermark.getMillis(), task);
+ }
}
- // update output watermark progress metric
- samzaTransformMetricRegistry
- .getTransformMetrics()
- .getTransformWatermarkProgress(transformFullName)
- .set(watermark.getMillis());
+ if (opType == SamzaMetricOpFactory.OpType.OUTPUT) {
+ // update output watermark progress metric
+ samzaTransformMetricRegistry
+ .getTransformMetrics()
+ .getTransformWatermarkProgress(transformFullName)
+ .set(watermark.getMillis());
+ }
// reset all counters
count.set(0L);
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricOpFactory.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricOpFactory.java
index 9d4f4013b0f..503e9049b98 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricOpFactory.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricOpFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.samza.metrics;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.samza.runtime.Op;
import org.checkerframework.checker.nullness.qual.NonNull;
@@ -45,6 +46,7 @@ public class SamzaMetricOpFactory {
/**
* Create a {@link Op} for default transform metric computation.
*
+ * @param urn URN of the PCollection metric Op is processing
* @param pValue name of the PCollection metric Op is processing
* @param transformName name of the PTransform for which metric Op is created
* @param opType type of the metric
@@ -53,17 +55,19 @@ public class SamzaMetricOpFactory {
* @return a {@link Op} for default transform metric computation
*/
public static @NonNull <T> Op<T, T, Void> createMetricOp(
+ @NonNull String urn,
@NonNull String pValue,
@NonNull String transformName,
@NonNull OpType opType,
@NonNull SamzaTransformMetricRegistry samzaTransformMetricRegistry) {
- switch (opType) {
- case INPUT:
- return new SamzaInputMetricOp(pValue, transformName,
samzaTransformMetricRegistry);
- case OUTPUT:
- return new SamzaOutputMetricOp(pValue, transformName,
samzaTransformMetricRegistry);
- default:
- throw new IllegalArgumentException("Unknown OpType: " + opType);
+ if (isDataShuffleTransform(urn)) {
+ return new SamzaGBKMetricOp<>(pValue, transformName, opType,
samzaTransformMetricRegistry);
}
+ return new SamzaMetricOp<>(pValue, transformName, opType,
samzaTransformMetricRegistry);
+ }
+
+ private static boolean isDataShuffleTransform(String urn) {
+ return urn.equals(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN)
+ || urn.equals(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN);
}
}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetricRegistry.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetricRegistry.java
index 1dd0639e256..e4849c31fa6 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetricRegistry.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetricRegistry.java
@@ -17,11 +17,13 @@
*/
package org.apache.beam.runners.samza.metrics;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.samza.context.Context;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -42,12 +44,17 @@ public class SamzaTransformMetricRegistry implements
Serializable {
// TransformName -> PValue for pCollection -> Map<WatermarkId,
AvgArrivalTime>
private final ConcurrentHashMap<String, ConcurrentHashMap<String,
ConcurrentHashMap<Long, Long>>>
avgArrivalTimeMap;
+ // TransformName -> Map<WindowId, AvgArrivalTime>
+ @SuppressFBWarnings("SE_BAD_FIELD")
+ private final ConcurrentHashMap<String, ConcurrentHashMap<BoundedWindow,
Long>>
+ avgArrivalTimeMapForGbk;
// Per Transform Metrics for each primitive transform
private final SamzaTransformMetrics transformMetrics;
public SamzaTransformMetricRegistry() {
this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+ this.avgArrivalTimeMapForGbk = new ConcurrentHashMap<>();
this.transformMetrics = new SamzaTransformMetrics();
}
@@ -55,6 +62,7 @@ public class SamzaTransformMetricRegistry implements
Serializable {
SamzaTransformMetricRegistry(SamzaTransformMetrics samzaTransformMetrics) {
this.transformMetrics = samzaTransformMetrics;
this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+ this.avgArrivalTimeMapForGbk = new ConcurrentHashMap<>();
}
public void register(String transformFullName, String pValue, Context ctx) {
@@ -62,6 +70,7 @@ public class SamzaTransformMetricRegistry implements
Serializable {
// initialize the map for the transform
avgArrivalTimeMap.putIfAbsent(transformFullName, new
ConcurrentHashMap<>());
avgArrivalTimeMap.get(transformFullName).putIfAbsent(pValue, new
ConcurrentHashMap<>());
+ avgArrivalTimeMapForGbk.putIfAbsent(transformFullName, new
ConcurrentHashMap<>());
}
public SamzaTransformMetrics getTransformMetrics() {
@@ -80,6 +89,49 @@ public class SamzaTransformMetricRegistry implements
Serializable {
}
}
+ public void updateArrivalTimeMap(String transformName, BoundedWindow
windowId, long avg) {
+ ConcurrentHashMap<BoundedWindow, Long> avgArrivalTimeMapForTransform =
+ avgArrivalTimeMapForGbk.get(transformName);
+ if (avgArrivalTimeMapForTransform != null) {
+ avgArrivalTimeMapForTransform.put(windowId, avg);
+ }
+ }
+
+ @SuppressWarnings("nullness")
+ public void emitLatencyMetric(
+ String transformName, BoundedWindow windowId, long avgArrivalEndTime,
String taskName) {
+ Long avgArrivalStartTime =
+ avgArrivalTimeMapForGbk.get(transformName) != null
+ ? avgArrivalTimeMapForGbk.get(transformName).remove(windowId)
+ : null;
+
+ if (avgArrivalStartTime == null || avgArrivalStartTime == 0 ||
avgArrivalEndTime == 0) {
+ LOG.debug(
+ "Failure to Emit Metric for Transform: {}, Start-Time: {} or
End-Time: {} found is 0/null for windowId: {}, task: {}",
+ transformName,
+ avgArrivalStartTime,
+ avgArrivalEndTime,
+ windowId,
+ taskName);
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Success Emit Metric for Transform: {}, window: {} for task: {}",
+ transformName,
+ windowId,
+ taskName);
+ }
+ transformMetrics
+ .getTransformLatencyMetric(transformName)
+ .update(avgArrivalEndTime - avgArrivalStartTime);
+
+ transformMetrics
+ .getTransformCacheSize(transformName)
+ .set((long) avgArrivalTimeMapForGbk.get(transformName).size());
+ }
+
// Checker framework bug:
https://github.com/typetools/checker-framework/issues/979
@SuppressWarnings("return")
public void emitLatencyMetric(
@@ -126,6 +178,14 @@ public class SamzaTransformMetricRegistry implements
Serializable {
final long endTime = Collections.max(outputPValuesAvgArrivalTimes);
final long latency = endTime - startTime;
transformMetrics.getTransformLatencyMetric(transformName).update(latency);
+
+ transformMetrics
+ .getTransformCacheSize(transformName)
+ .set(
+ avgArrivalTimeMapForTransform.values().stream()
+ .mapToLong(ConcurrentHashMap::size)
+ .sum());
+
LOG.debug(
"Success Emit Metric Transform: {} for watermark: {} for task: {}",
transformName,
@@ -139,4 +199,10 @@ public class SamzaTransformMetricRegistry implements
Serializable {
String transformName) {
return avgArrivalTimeMap.get(transformName);
}
+
+ @VisibleForTesting
+ @Nullable
+ ConcurrentHashMap<BoundedWindow, Long> getAverageArrivalTimeMapForGBK(String
transformName) {
+ return avgArrivalTimeMapForGbk.get(transformName);
+ }
}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetrics.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetrics.java
index 13fbf6007e5..49da3effd8c 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetrics.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetrics.java
@@ -43,6 +43,8 @@ public class SamzaTransformMetrics implements Serializable {
private static final String TRANSFORM_IP_THROUGHPUT = "num-input-messages";
private static final String TRANSFORM_OP_THROUGHPUT = "num-output-messages";
+ private static final String TRANSFORM_ARRIVAL_TIME_CACHE_SIZE =
"in-mem-cache-size";
+
// Transform name to metric maps
@SuppressFBWarnings("SE_BAD_FIELD")
private final Map<String, Timer> transformLatency;
@@ -56,11 +58,15 @@ public class SamzaTransformMetrics implements Serializable {
@SuppressFBWarnings("SE_BAD_FIELD")
private final Map<String, Counter> transformOutputThroughPut;
+ @SuppressFBWarnings("SE_BAD_FIELD")
+ private final Map<String, Gauge<Long>> transformCacheSize;
+
public SamzaTransformMetrics() {
this.transformLatency = new ConcurrentHashMap<>();
this.transformOutputThroughPut = new ConcurrentHashMap<>();
this.transformWatermarkProgress = new ConcurrentHashMap<>();
this.transformInputThroughput = new ConcurrentHashMap<>();
+ this.transformCacheSize = new ConcurrentHashMap<>();
}
public void register(String transformName, Context ctx) {
@@ -91,6 +97,14 @@ public class SamzaTransformMetrics implements Serializable {
transformName,
metricsRegistry.newCounter(
GROUP, getMetricNameWithPrefix(TRANSFORM_IP_THROUGHPUT,
transformName)));
+ transformCacheSize.putIfAbsent(
+ transformName,
+ ctx.getTaskContext()
+ .getTaskMetricsRegistry()
+ .newGauge(
+ GROUP,
+ getMetricNameWithPrefix(TRANSFORM_ARRIVAL_TIME_CACHE_SIZE,
transformName),
+ 0L));
}
public Timer getTransformLatencyMetric(String transformName) {
@@ -105,6 +119,10 @@ public class SamzaTransformMetrics implements Serializable
{
return transformOutputThroughPut.get(transformName);
}
+ public Gauge<Long> getTransformCacheSize(String transformName) {
+ return transformCacheSize.get(transformName);
+ }
+
public Gauge<Long> getTransformWatermarkProgress(String transformName) {
return transformWatermarkProgress.get(transformName);
}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
index 09ed68457b3..e3e4bb46fce 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
@@ -182,13 +182,10 @@ public class TranslationContext {
TransformHierarchy.Node node,
SamzaMetricOpFactory.OpType opType) {
final Boolean enableTransformMetrics =
getPipelineOptions().getEnableTransformMetrics();
- final String urn = PTransformTranslation.urnForTransformOrNull(transform);
+ final String transformURN =
PTransformTranslation.urnForTransformOrNull(transform);
- // skip attach transform if user override is false or transform is GBK
- if (!enableTransformMetrics
- || urn == null
- || urn.equals(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN)
- || urn.equals(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN)) {
+ // skip attach transform if user override is false or transform is not
registered
+ if (!enableTransformMetrics || transformURN == null) {
return;
}
@@ -198,11 +195,22 @@ public class TranslationContext {
}
for (PValue pValue : getPValueForTransform(opType, transform, node)) {
+ // skip attach transform if pValue is not registered i.e. if not
translated with a samza
+ // translator
+ if (!messsageStreams.containsKey(pValue)) {
+ LOG.debug(
+ "Skip attach transform metric op for pValue: {} for transform: {}",
+ pValue,
+ getTransformFullName());
+ continue;
+ }
+
// add another step for default metric computation
getMessageStream(pValue)
.flatMapAsync(
OpAdapter.adapt(
SamzaMetricOpFactory.createMetricOp(
+ transformURN,
pValue.getName(),
getTransformFullName(),
opType,
diff --git
a/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaRunnerWithTransformMetrics.java
b/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaRunnerWithTransformMetrics.java
index 60a1a20d300..a605f4fc27b 100644
---
a/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaRunnerWithTransformMetrics.java
+++
b/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaRunnerWithTransformMetrics.java
@@ -35,10 +35,13 @@ import
org.apache.beam.runners.samza.util.InMemoryMetricsReporter;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -49,6 +52,7 @@ import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.Metric;
import org.apache.samza.metrics.Timer;
import org.apache.samza.system.WatermarkMessage;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
@@ -63,7 +67,7 @@ public class TestSamzaRunnerWithTransformMetrics {
TestSamzaRunner testSamzaRunner = TestSamzaRunner.fromOptions(options);
Pipeline pipeline = Pipeline.create(options);
// Create a pipeline
- PCollection<KV<String, Integer>> output =
+ PCollection<KV<String, Long>> output =
pipeline
.apply(
"Mock data",
@@ -73,9 +77,12 @@ public class TestSamzaRunnerWithTransformMetrics {
KV.of("hello", KV.of("b", 42)),
KV.of("hello", KV.of("c", 12))))
.apply("Filter valid keys", Filter.by(x ->
x.getKey().equals("hello")))
- .apply(Values.create());
+ .apply(Values.create())
+ .apply("Fixed-window",
Window.into(FixedWindows.of(Duration.standardSeconds(10))))
+ .apply(Count.perKey());
+
// check pipeline is working fine
- PAssert.that(output).containsInAnyOrder(KV.of("a", 97), KV.of("b", 42),
KV.of("c", 12));
+ PAssert.that(output).containsInAnyOrder(KV.of("a", 1L), KV.of("b", 1L),
KV.of("c", 1L));
testSamzaRunner.run(pipeline);
Map<String, Metric> pTransformContainerMetrics =
@@ -122,15 +129,31 @@ public class TestSamzaRunnerWithTransformMetrics {
pTransformContainerMetrics.get(
"Values_Values_Map_ParMultiDo_Anonymous_-num-output-messages"))
.getCount());
+ assertEquals(
+ 3,
+ ((Counter)
pTransformContainerMetrics.get("Fixed_window_Window_Assign-num-input-messages"))
+ .getCount());
+ assertEquals(
+ 3,
+ ((Counter)
pTransformContainerMetrics.get("Fixed_window_Window_Assign-num-output-messages"))
+ .getCount());
+ assertEquals(
+ 3,
+ ((Counter)
pTransformContainerMetrics.get("Combine_perKey_Count_-num-input-messages"))
+ .getCount());
+ assertEquals(
+ 3,
+ ((Counter)
pTransformContainerMetrics.get("Combine_perKey_Count_-num-output-messages"))
+ .getCount());
// Throughput Metrics are per container by default
assertNotNull(
pTransformContainerMetrics.get(
"Filter_valid_keys_ParDo_Anonymous__ParMultiDo_Anonymous_-handle-message-ns"));
- // One message is dropped from filter
assertNotNull(
pTransformContainerMetrics.get(
"Values_Values_Map_ParMultiDo_Anonymous_-handle-message-ns"));
+
assertNotNull(pTransformContainerMetrics.get("Combine_perKey_Count_-handle-message-ns"));
// Watermark Metrics are Per task by default
assertTrue(
@@ -143,6 +166,7 @@ public class TestSamzaRunnerWithTransformMetrics {
"Filter_valid_keys_ParDo_Anonymous__ParMultiDo_Anonymous_-output-watermark-ms"));
assertNotNull(
pTransformTaskMetrics.get("Values_Values_Map_ParMultiDo_Anonymous_-output-watermark-ms"));
+
assertNotNull(pTransformTaskMetrics.get("Combine_perKey_Count_-output-watermark-ms"));
}
@Test
@@ -161,6 +185,7 @@ public class TestSamzaRunnerWithTransformMetrics {
Counter inputCounter = new Counter("filter-input-counter");
Counter outputCounter = new Counter("filter-output-counter");
Gauge<Long> watermarkProgress = new Gauge<>("filter-output-watermark", 0L);
+ Gauge<Long> cacheSize = new Gauge<>("filter-arrival-time-cache-size", 0L);
Timer latency = new Timer("filter-latency");
SamzaTransformMetrics samzaTransformMetrics =
mock(SamzaTransformMetrics.class);
@@ -170,14 +195,19 @@ public class TestSamzaRunnerWithTransformMetrics {
when(samzaTransformMetrics.getTransformWatermarkProgress("filter"))
.thenReturn(watermarkProgress);
when(samzaTransformMetrics.getTransformLatencyMetric("filter")).thenReturn(latency);
+
when(samzaTransformMetrics.getTransformCacheSize("filter")).thenReturn(cacheSize);
SamzaTransformMetricRegistry samzaTransformMetricRegistry =
spy(new SamzaTransformMetricRegistry(samzaTransformMetrics));
samzaTransformMetricRegistry.register("filter", "dummy-pvalue.in",
mock(Context.class));
samzaTransformMetricRegistry.register("filter", "dummy-pvalue.out",
mock(Context.class));
- SamzaInputMetricOp<String> inputMetricOp =
- new SamzaInputMetricOp<>("dummy-pvalue.in", "filter",
samzaTransformMetricRegistry);
+ SamzaMetricOp<String> inputMetricOp =
+ new SamzaMetricOp<>(
+ "dummy-pvalue.in",
+ "filter",
+ SamzaMetricOpFactory.OpType.INPUT,
+ samzaTransformMetricRegistry);
inputMetricOp.processElement(windowedValue, opEmitter);
inputMetricOp.processElement(windowedValue2, opEmitter);
@@ -192,8 +222,12 @@ public class TestSamzaRunnerWithTransformMetrics {
.get("dummy-pvalue.in")
.containsKey(watermarkMessage.getTimestamp()));
- SamzaOutputMetricOp<String> outputMetricOp =
- new SamzaOutputMetricOp<>("dummy-pvalue.out", "filter",
samzaTransformMetricRegistry);
+ SamzaMetricOp<String> outputMetricOp =
+ new SamzaMetricOp<>(
+ "dummy-pvalue.out",
+ "filter",
+ SamzaMetricOpFactory.OpType.OUTPUT,
+ samzaTransformMetricRegistry);
outputMetricOp.init(ImmutableList.of("dummy-pvalue.in"),
ImmutableList.of("dummy-pvalue.out"));
outputMetricOp.processElement(windowedValue, opEmitter);
@@ -206,5 +240,80 @@ public class TestSamzaRunnerWithTransformMetrics {
assertEquals(watermarkMessage.getTimestamp(),
watermarkProgress.getValue().longValue());
// Latency must be positive
assertTrue(latency.getSnapshot().getAverage() > 0);
+ // Cache size must be 0
+ assertEquals(0, cacheSize.getValue().intValue());
+ }
+
+ @Test
+ public void testSamzaInputAndOutputGBKMetricOp() {
+ final WindowedValue<String> windowedValue =
+ WindowedValue.timestampedValueInGlobalWindow("value-1", new Instant());
+ final WindowedValue<String> windowedValue2 =
+ WindowedValue.timestampedValueInGlobalWindow("value-2", new Instant());
+ final WatermarkMessage watermarkMessage =
+ new WatermarkMessage(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+
+ OpEmitter<String> opEmitter = mock(OpEmitter.class);
+ doNothing().when(opEmitter).emitElement(any());
+ doNothing().when(opEmitter).emitWatermark(any());
+
+ Counter inputCounter = new Counter("Count-perKey-input-counter");
+ Counter outputCounter = new Counter("Count-perKey-output-counter");
+ Gauge<Long> watermarkProgress = new
Gauge<>("Count-perKey-output-watermark", 0L);
+ Timer latency = new Timer("Count-perKey-latency");
+ Gauge<Long> cacheSize = new
Gauge<>("Count-perKey-arrival-time-cache-size", 0L);
+
+ SamzaTransformMetrics samzaTransformMetrics =
mock(SamzaTransformMetrics.class);
+ doNothing().when(samzaTransformMetrics).register(any(), any());
+ when(samzaTransformMetrics.getTransformInputThroughput("Count-perKey"))
+ .thenReturn(inputCounter);
+ when(samzaTransformMetrics.getTransformOutputThroughput("Count-perKey"))
+ .thenReturn(outputCounter);
+ when(samzaTransformMetrics.getTransformWatermarkProgress("Count-perKey"))
+ .thenReturn(watermarkProgress);
+
when(samzaTransformMetrics.getTransformLatencyMetric("Count-perKey")).thenReturn(latency);
+
when(samzaTransformMetrics.getTransformCacheSize("Count-perKey")).thenReturn(cacheSize);
+
+ SamzaTransformMetricRegistry samzaTransformMetricRegistry =
+ spy(new SamzaTransformMetricRegistry(samzaTransformMetrics));
+ samzaTransformMetricRegistry.register("Count-perKey", "window-assign.in",
mock(Context.class));
+ samzaTransformMetricRegistry.register("Count-perKey", "window-assign.out",
mock(Context.class));
+
+ SamzaGBKMetricOp<String> inputMetricOp =
+ new SamzaGBKMetricOp<>(
+ "window-assign.in",
+ "Count-perKey",
+ SamzaMetricOpFactory.OpType.INPUT,
+ samzaTransformMetricRegistry);
+
+ inputMetricOp.processElement(windowedValue, opEmitter);
+ inputMetricOp.processElement(windowedValue2, opEmitter);
+ inputMetricOp.processWatermark(new
Instant(watermarkMessage.getTimestamp()), opEmitter);
+
+ // Input throughput must be updated
+ assertEquals(2, inputCounter.getCount());
+ // Avg arrival time for must be present for one Global Window
+ assertEquals(
+ 1,
samzaTransformMetricRegistry.getAverageArrivalTimeMapForGBK("Count-perKey").size());
+
+ SamzaGBKMetricOp<String> outputMetricOp =
+ new SamzaGBKMetricOp<>(
+ "window-assign.out",
+ "Count-perKey",
+ SamzaMetricOpFactory.OpType.OUTPUT,
+ samzaTransformMetricRegistry);
+
+ outputMetricOp.processElement(windowedValue, opEmitter);
+ outputMetricOp.processElement(windowedValue2, opEmitter);
+ outputMetricOp.processWatermark(new
Instant(watermarkMessage.getTimestamp()), opEmitter);
+
+ // Output throughput must be updated
+ assertEquals(2, outputCounter.getCount());
+ // Output watermark must be updated
+ assertEquals(watermarkMessage.getTimestamp(),
watermarkProgress.getValue().longValue());
+ // Latency must be positive
+ assertTrue(latency.getSnapshot().getAverage() > 0);
+ // Cache size must be 0
+ assertEquals(0, cacheSize.getValue().intValue());
}
}
diff --git
a/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaTransformMetricsRegistry.java
b/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaTransformMetricsRegistry.java
new file mode 100644
index 00000000000..f8dacc3aec6
--- /dev/null
+++
b/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaTransformMetricsRegistry.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.system.WatermarkMessage;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+public class TestSamzaTransformMetricsRegistry {
+
+ @Test
+ public void testSamzaTransformMetricsRegistryForNonShuffleOperators() {
+ final WatermarkMessage watermarkMessage =
+ new WatermarkMessage(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+ final long avgInputArrivalTime = System.currentTimeMillis();
+
+ Timer latency = new Timer("filter-latency");
+ Gauge<Long> cacheSize = new Gauge<Long>("filter-cache-size", 0L);
+ SamzaTransformMetrics samzaTransformMetrics =
mock(SamzaTransformMetrics.class);
+ doNothing().when(samzaTransformMetrics).register(any(), any());
+
when(samzaTransformMetrics.getTransformLatencyMetric("filter")).thenReturn(latency);
+
when(samzaTransformMetrics.getTransformCacheSize("filter")).thenReturn(cacheSize);
+
+ SamzaTransformMetricRegistry samzaTransformMetricRegistry =
+ spy(new SamzaTransformMetricRegistry(samzaTransformMetrics));
+
+ samzaTransformMetricRegistry.register("filter", "dummy-pvalue.in",
mock(Context.class));
+ samzaTransformMetricRegistry.register("filter", "dummy-pvalue.out",
mock(Context.class));
+
+ // Update the avg arrival time
+ samzaTransformMetricRegistry.updateArrivalTimeMap(
+ "filter", "dummy-pvalue.in", watermarkMessage.getTimestamp(),
avgInputArrivalTime);
+ samzaTransformMetricRegistry.updateArrivalTimeMap(
+ "filter", "dummy-pvalue.out", watermarkMessage.getTimestamp(),
avgInputArrivalTime + 100);
+
+ // Check the avg arrival time is updated
+ assertEquals(
+ avgInputArrivalTime,
+ samzaTransformMetricRegistry
+ .getAverageArrivalTimeMap("filter")
+ .get("dummy-pvalue.in")
+ .get(watermarkMessage.getTimestamp())
+ .longValue());
+ assertEquals(
+ avgInputArrivalTime + 100,
+ samzaTransformMetricRegistry
+ .getAverageArrivalTimeMap("filter")
+ .get("dummy-pvalue.out")
+ .get(watermarkMessage.getTimestamp())
+ .longValue());
+
+ // Emit the latency metric
+ samzaTransformMetricRegistry.emitLatencyMetric(
+ "filter",
+ ImmutableList.of("dummy-pvalue.in"),
+ ImmutableList.of("dummy-pvalue.out"),
+ watermarkMessage.getTimestamp(),
+ "task 0");
+
+ // Check the latency metric is updated
+ assertTrue(100 == latency.getSnapshot().getAverage());
+
+ // Check the avg arrival time is cleared
+ assertFalse(
+ samzaTransformMetricRegistry
+ .getAverageArrivalTimeMap("filter")
+ .get("dummy-pvalue.in")
+ .containsKey(watermarkMessage.getTimestamp()));
+ assertFalse(
+ samzaTransformMetricRegistry
+ .getAverageArrivalTimeMap("filter")
+ .get("dummy-pvalue.out")
+ .containsKey(watermarkMessage.getTimestamp()));
+ // Cache size must be 0
+ assertEquals(0, cacheSize.getValue().intValue());
+ }
+
+ @Test
+ public void testSamzaTransformMetricsRegistryForDataShuffleOperators() {
+ Timer latency = new Timer("Count-perKey-latency");
+ Gauge<Long> cacheSize = new Gauge<Long>("Count-perKey-cache-size", 0L);
+
+ SamzaTransformMetrics samzaTransformMetrics =
mock(SamzaTransformMetrics.class);
+ doNothing().when(samzaTransformMetrics).register(any(), any());
+
when(samzaTransformMetrics.getTransformLatencyMetric("Count.perKey")).thenReturn(latency);
+
when(samzaTransformMetrics.getTransformCacheSize("Count.perKey")).thenReturn(cacheSize);
+
+ SamzaTransformMetricRegistry samzaTransformMetricRegistry =
+ spy(new SamzaTransformMetricRegistry(samzaTransformMetrics));
+
+ samzaTransformMetricRegistry.register("Count.perKey", "window-assign.in",
mock(Context.class));
+ samzaTransformMetricRegistry.register("Count.perKey", "window-assign.out",
mock(Context.class));
+
+ final BoundedWindow first =
+ new BoundedWindow() {
+ @Override
+ public Instant maxTimestamp() {
+ return new Instant(2048L);
+ }
+ };
+ final BoundedWindow second =
+ new BoundedWindow() {
+ @Override
+ public Instant maxTimestamp() {
+ return new Instant(689743L);
+ }
+ };
+
+ // Update the avg arrival time
+ samzaTransformMetricRegistry.updateArrivalTimeMap("Count.perKey", first,
1048L);
+ samzaTransformMetricRegistry.updateArrivalTimeMap("Count.perKey", second,
4897L);
+
+ // Check the avg arrival time is updated
+ assertEquals(
+ 1048L,
+ samzaTransformMetricRegistry
+ .getAverageArrivalTimeMapForGBK("Count.perKey")
+ .get(first)
+ .longValue());
+ assertEquals(
+ 4897L,
+ samzaTransformMetricRegistry
+ .getAverageArrivalTimeMapForGBK("Count.perKey")
+ .get(second)
+ .longValue());
+
+ // Emit the latency metric
+ samzaTransformMetricRegistry.emitLatencyMetric("Count.perKey", first,
2048L, "task 0");
+ samzaTransformMetricRegistry.emitLatencyMetric("Count.perKey", second,
5897L, "task 0");
+
+ // Check the latency metric is updated
+ assertTrue(1000 == latency.getSnapshot().getAverage());
+
+ // Check the avg arrival time is cleared
+ assertFalse(
+ samzaTransformMetricRegistry
+ .getAverageArrivalTimeMapForGBK("Count.perKey")
+ .containsKey(first));
+ assertFalse(
+ samzaTransformMetricRegistry
+ .getAverageArrivalTimeMapForGBK("Count.perKey")
+ .containsKey(second));
+
+ // Failure testing
+ samzaTransformMetricRegistry.updateArrivalTimeMap("random-transform",
first, 1048L);
+ samzaTransformMetricRegistry.updateArrivalTimeMap("random-transform",
first, 1048L);
+ // No data updated
+ assertFalse(
+ samzaTransformMetricRegistry
+ .getAverageArrivalTimeMapForGBK("Count.perKey")
+ .containsKey(first));
+
assertNull(samzaTransformMetricRegistry.getAverageArrivalTimeMapForGBK("random-transform"));
+ // Emit the bad latency metric
+ samzaTransformMetricRegistry.emitLatencyMetric("random-transform", first,
2048L, "task 0");
+ samzaTransformMetricRegistry.emitLatencyMetric("random-transform", first,
0, "task 0");
+ // Check the latency metric is same
+ assertTrue(1000 == latency.getSnapshot().getAverage());
+ // Cache size must be 0
+ assertEquals(0, cacheSize.getValue().intValue());
+ }
+}