This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f1504fb60b Basic Opeartor Metric Support For Data Shuffle (GBK and 
Combine Per Key) Operators for Samza Runner (#26649)
4f1504fb60b is described below

commit 4f1504fb60bde3685eb07852ad852445c9b8078d
Author: Sanil Jain <[email protected]>
AuthorDate: Thu May 18 17:00:59 2023 -0700

    Basic Opeartor Metric Support For Data Shuffle (GBK and Combine Per Key) 
Operators for Samza Runner (#26649)
---
 .../runners/samza/metrics/SamzaGBKMetricOp.java    | 194 +++++++++++++++++++++
 .../runners/samza/metrics/SamzaInputMetricOp.java  | 133 --------------
 ...SamzaOutputMetricOp.java => SamzaMetricOp.java} |  73 +++++---
 .../samza/metrics/SamzaMetricOpFactory.java        |  18 +-
 .../metrics/SamzaTransformMetricRegistry.java      |  66 +++++++
 .../samza/metrics/SamzaTransformMetrics.java       |  18 ++
 .../samza/translation/TranslationContext.java      |  20 ++-
 .../TestSamzaRunnerWithTransformMetrics.java       | 125 ++++++++++++-
 .../metrics/TestSamzaTransformMetricsRegistry.java | 191 ++++++++++++++++++++
 9 files changed, 656 insertions(+), 182 deletions(-)

diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaGBKMetricOp.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaGBKMetricOp.java
new file mode 100644
index 00000000000..897c4a05b15
--- /dev/null
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaGBKMetricOp.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.runners.samza.runtime.KeyedTimerData;
+import org.apache.beam.runners.samza.runtime.Op;
+import org.apache.beam.runners.samza.runtime.OpEmitter;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SamzaGBKMetricOp is a {@link Op} that emits & maintains default metrics for 
input or output
+ * PCollection for GroupByKey.
+ *
+ * <p>For Input PCollection: It emits the input throughput and maintains avg 
input time for input
+ * PCollection per windowId.
+ *
+ * <p>For Output PCollection: It emits the output throughput and maintains avg 
output time for
+ * output PCollection per windowId. It is also responsible for emitting 
latency metric per windowId
+ * once the watermark passes the end of window timestamp.
+ *
+ * <p>Assumes that {@code SamzaGBKMetricOp#processWatermark(Instant, 
OpEmitter)} is exclusive of
+ * {@code SamzaGBKMetricOp#processElement(Instant, OpEmitter)}. Specifically, 
the processWatermark
+ * method assumes that no calls to processElement will be made during its 
execution, and vice versa.
+ *
+ * @param <T> The type of the elements in the input PCollection.
+ */
+class SamzaGBKMetricOp<T> implements Op<T, T, Void> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaGBKMetricOp.class);
+  // Unique name of the PTransform this MetricOp is associated with
+  private final String transformFullName;
+  private final SamzaTransformMetricRegistry samzaTransformMetricRegistry;
+  // Type of the processing operation
+  private final SamzaMetricOpFactory.OpType opType;
+
+  private final String pValue;
+  // Counters for keeping sum of arrival time and count of elements per 
windowId
+  @SuppressFBWarnings("SE_BAD_FIELD")
+  private final ConcurrentHashMap<BoundedWindow, BigInteger> 
sumOfTimestampsPerWindowId;
+
+  @SuppressFBWarnings("SE_BAD_FIELD")
+  private final ConcurrentHashMap<BoundedWindow, Long> sumOfCountPerWindowId;
+  // Name of the task, for logging purpose
+  private transient String task;
+
+  @Override
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void open(
+      Config config,
+      Context context,
+      Scheduler<KeyedTimerData<Void>> timerRegistry,
+      OpEmitter<T> emitter) {
+    // for logging / debugging purposes
+    this.task = 
context.getTaskContext().getTaskModel().getTaskName().getTaskName();
+    // Register the transform with SamzaTransformMetricRegistry
+    samzaTransformMetricRegistry.register(transformFullName, pValue, context);
+  }
+
+  // Some fields are initialized in open() method, which is called after the 
constructor.
+  @SuppressWarnings("initialization.fields.uninitialized")
+  public SamzaGBKMetricOp(
+      String pValue,
+      String transformFullName,
+      SamzaMetricOpFactory.OpType opType,
+      SamzaTransformMetricRegistry samzaTransformMetricRegistry) {
+    this.pValue = pValue;
+    this.transformFullName = transformFullName;
+    this.opType = opType;
+    this.samzaTransformMetricRegistry = samzaTransformMetricRegistry;
+    this.sumOfTimestampsPerWindowId = new ConcurrentHashMap<>();
+    this.sumOfCountPerWindowId = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void processElement(WindowedValue<T> inputElement, OpEmitter<T> 
emitter) {
+    // one element can belong to multiple windows
+    for (BoundedWindow windowId : inputElement.getWindows()) {
+      // Atomic updates to counts
+      sumOfCountPerWindowId.compute(
+          windowId,
+          (key, value) -> {
+            value = value == null ? Long.valueOf(0) : value;
+            return ++value;
+          });
+      // Atomic updates to sum of arrival timestamps
+      sumOfTimestampsPerWindowId.compute(
+          windowId,
+          (key, value) -> {
+            value = value == null ? BigInteger.ZERO : value;
+            return value.add(BigInteger.valueOf(System.nanoTime()));
+          });
+    }
+
+    switch (opType) {
+      case INPUT:
+        samzaTransformMetricRegistry
+            .getTransformMetrics()
+            .getTransformInputThroughput(transformFullName)
+            .inc();
+        break;
+      case OUTPUT:
+        samzaTransformMetricRegistry
+            .getTransformMetrics()
+            .getTransformOutputThroughput(transformFullName)
+            .inc();
+        break;
+    }
+    emitter.emitElement(inputElement);
+  }
+
+  @Override
+  public void processWatermark(Instant watermark, OpEmitter<T> emitter) {
+    final List<BoundedWindow> closedWindows = new ArrayList<>();
+    sumOfCountPerWindowId.keySet().stream()
+        .filter(windowId -> watermark.isAfter(windowId.maxTimestamp())) // 
window is closed
+        .forEach(
+            windowId -> {
+              // In case if BigInteger overflows for long we only retain the 
last 64 bits of the sum
+              long sumOfTimestamps =
+                  sumOfTimestampsPerWindowId.get(windowId) != null
+                      ? sumOfTimestampsPerWindowId.get(windowId).longValue()
+                      : 0L;
+              long count = sumOfCountPerWindowId.get(windowId);
+              closedWindows.add(windowId);
+
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                    "Processing {} Watermark for Transform: {}, WindowId:{}, 
count: {}, sumOfTimestamps: {}, task: {}",
+                    opType,
+                    transformFullName,
+                    windowId,
+                    count,
+                    sumOfTimestamps,
+                    task);
+              }
+
+              // if the window is closed and there is some data
+              if (sumOfTimestamps > 0 && count > 0) {
+                switch (opType) {
+                  case INPUT:
+                    // Update the arrival time for the window
+                    samzaTransformMetricRegistry.updateArrivalTimeMap(
+                        transformFullName, windowId, 
Math.floorDiv(sumOfTimestamps, count));
+                    break;
+                  case OUTPUT:
+                    // Compute the latency if there is some data for the window
+                    samzaTransformMetricRegistry.emitLatencyMetric(
+                        transformFullName, windowId, 
Math.floorDiv(sumOfTimestamps, count), task);
+                    break;
+                }
+              }
+            });
+
+    // remove the closed windows
+    sumOfCountPerWindowId.keySet().removeAll(closedWindows);
+    sumOfTimestampsPerWindowId.keySet().removeAll(closedWindows);
+
+    // Update the watermark progress for the transform output
+    if (opType == SamzaMetricOpFactory.OpType.OUTPUT) {
+      samzaTransformMetricRegistry
+          .getTransformMetrics()
+          .getTransformWatermarkProgress(transformFullName)
+          .set(watermark.getMillis());
+    }
+
+    emitter.emitWatermark(watermark);
+  }
+}
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaInputMetricOp.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaInputMetricOp.java
deleted file mode 100644
index 88360c689f2..00000000000
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaInputMetricOp.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.samza.metrics;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.beam.runners.samza.runtime.KeyedTimerData;
-import org.apache.beam.runners.samza.runtime.Op;
-import org.apache.beam.runners.samza.runtime.OpEmitter;
-import org.apache.beam.runners.samza.util.PipelineJsonRenderer;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.samza.config.Config;
-import org.apache.samza.context.Context;
-import org.apache.samza.operators.Scheduler;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * SamzaInputMetricOp emits & maintains default transform metrics for input 
PCollection to the
- * transform. It emits the input throughput and maintains avg arrival time for 
input PCollection per
- * watermark.
- *
- * <p>Assumes that {@code SamzaInputMetricOp#processWatermark(Instant, 
OpEmitter)} is exclusive of
- * {@code SamzaInputMetricOp#processElement(Instant, OpEmitter)}. 
Specifically, the processWatermark
- * method assumes that no calls to processElement will be made during its 
execution, and vice versa.
- *
- * @param <T> The type of the elements in the input PCollection.
- */
-class SamzaInputMetricOp<T> implements Op<T, T, Void> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaInputMetricOp.class);
-
-  // Unique name of the PTransform this MetricOp is associated with
-  protected final String transformFullName;
-  protected final SamzaTransformMetricRegistry samzaTransformMetricRegistry;
-  // Name or identifier of the PCollection which PTransform is processing
-  protected final String pValue;
-  // Counters to maintain avg arrival time per watermark for input PCollection.
-  private final AtomicLong count;
-  private final AtomicReference<BigInteger> sumOfTimestamps;
-  // List of input PValue(s) for all PCollections processing the PTransform
-  protected transient List<String> transformInputs;
-  // List of output PValue(s) for all PCollections processing the PTransform
-  protected transient List<String> transformOutputs;
-  // Name of the task, for logging purpose
-  protected transient String task;
-
-  // Some fields are initialized in open() method, which is called after the 
constructor.
-  @SuppressWarnings("initialization.fields.uninitialized")
-  public SamzaInputMetricOp(
-      String pValue,
-      String transformFullName,
-      SamzaTransformMetricRegistry samzaTransformMetricRegistry) {
-    this.transformFullName = transformFullName;
-    this.samzaTransformMetricRegistry = samzaTransformMetricRegistry;
-    this.pValue = pValue;
-    this.count = new AtomicLong(0L);
-    this.sumOfTimestamps = new AtomicReference<>(BigInteger.ZERO);
-  }
-
-  @Override
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  public void open(
-      Config config,
-      Context context,
-      Scheduler<KeyedTimerData<Void>> timerRegistry,
-      OpEmitter<T> emitter) {
-    final Map.Entry<List<String>, List<String>> transformInputOutput =
-        PipelineJsonRenderer.getTransformIOMap(config).get(transformFullName);
-    this.transformInputs =
-        transformInputOutput != null ? transformInputOutput.getKey() : new 
ArrayList();
-    this.transformOutputs =
-        transformInputOutput != null ? transformInputOutput.getValue() : new 
ArrayList();
-    // for logging / debugging purposes
-    this.task = 
context.getTaskContext().getTaskModel().getTaskName().getTaskName();
-    // Register the transform with SamzaTransformMetricRegistry
-    samzaTransformMetricRegistry.register(transformFullName, pValue, context);
-  }
-
-  @Override
-  public void processElement(WindowedValue<T> inputElement, OpEmitter<T> 
emitter) {
-    count.incrementAndGet();
-    sumOfTimestamps.updateAndGet(sum -> 
sum.add(BigInteger.valueOf(System.nanoTime())));
-    samzaTransformMetricRegistry
-        .getTransformMetrics()
-        .getTransformInputThroughput(transformFullName)
-        .inc();
-    emitter.emitElement(inputElement);
-  }
-
-  @Override
-  public void processWatermark(Instant watermark, OpEmitter<T> emitter) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(
-          "Processing Input Watermark for Transform: {} Count: {} 
SumOfTimestamps: {} for Watermark: {} for Task: {}",
-          transformFullName,
-          count.get(),
-          sumOfTimestamps.get().longValue(),
-          watermark.getMillis(),
-          task);
-    }
-    // if there is no input data then counters will be zero and only watermark 
will progress
-    if (count.get() > 0) {
-      // if BigInt.longValue is out of range for long then only the low-order 
64 bits are retained
-      long avg = Math.floorDiv(sumOfTimestamps.get().longValue(), count.get());
-      samzaTransformMetricRegistry.updateArrivalTimeMap(
-          transformFullName, pValue, watermark.getMillis(), avg);
-    }
-    // reset all counters
-    count.set(0L);
-    sumOfTimestamps.set(BigInteger.ZERO);
-    emitter.emitWatermark(watermark);
-  }
-}
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaOutputMetricOp.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricOp.java
similarity index 72%
rename from 
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaOutputMetricOp.java
rename to 
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricOp.java
index 1d34ff4c703..b624006fd98 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaOutputMetricOp.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricOp.java
@@ -38,44 +38,47 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * SamzaOutputMetricOp is a metric Op that emits & maintains default transform 
metrics for output
- * PCollection to the transform. It emits the output throughput and maintains 
avg arrival time for
- * output PCollection per watermark.
+ * SamzaMetricOp is a metric Op that emits & maintains default transform 
metrics for inputs &
+ * outputs PCollection to the non data-shuffle transform. It emits the output 
throughput and
+ * maintains avg arrival time for input & output PCollection per watermark.
  *
- * <p>Assumes that {@code SamzaOutputMetricOp#processWatermark(Instant, 
OpEmitter)} is exclusive of
- * {@code SamzaOutputMetricOp#processElement(Instant, OpEmitter)}. 
Specifically, the
- * processWatermark method assumes that no calls to processElement will be 
made during its
- * execution, and vice versa.
+ * <p>Assumes that {@code SamzaMetricOp#processWatermark(Instant, OpEmitter)} 
is exclusive of {@code
+ * SamzaMetricOp#processElement(Instant, OpEmitter)}. Specifically, the 
processWatermark method
+ * assumes that no calls to processElement will be made during its execution, 
and vice versa.
  *
  * @param <T> The type of the elements in the output PCollection.
  */
-class SamzaOutputMetricOp<T> implements Op<T, T, Void> {
+class SamzaMetricOp<T> implements Op<T, T, Void> {
   // Unique name of the PTransform this MetricOp is associated with
-  protected final String transformFullName;
-  protected final SamzaTransformMetricRegistry samzaTransformMetricRegistry;
+  private final String transformFullName;
+  private final SamzaTransformMetricRegistry samzaTransformMetricRegistry;
   // Name or identifier of the PCollection which PTransform is processing
-  protected final String pValue;
+  private final String pValue;
   // Counters for output throughput
   private final AtomicLong count;
   private final AtomicReference<BigInteger> sumOfTimestamps;
+  // Type of the PTransform input or output
+  private final SamzaMetricOpFactory.OpType opType;
   // List of input PValue(s) for all PCollections processing the PTransform
-  protected transient List<String> transformInputs;
+  private transient List<String> transformInputs;
   // List of output PValue(s) for all PCollections processing the PTransform
-  protected transient List<String> transformOutputs;
+  private transient List<String> transformOutputs;
   // Name of the task, for logging purpose
-  protected transient String task;
+  private transient String task;
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaOutputMetricOp.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaMetricOp.class);
 
   // Some fields are initialized in open() method, which is called after the 
constructor.
   @SuppressWarnings("initialization.fields.uninitialized")
-  public SamzaOutputMetricOp(
+  public SamzaMetricOp(
       @NonNull String pValue,
       @NonNull String transformFullName,
+      SamzaMetricOpFactory.OpType opType,
       @NonNull SamzaTransformMetricRegistry samzaTransformMetricRegistry) {
     this.transformFullName = transformFullName;
     this.samzaTransformMetricRegistry = samzaTransformMetricRegistry;
     this.pValue = pValue;
+    this.opType = opType;
     this.count = new AtomicLong(0L);
     this.sumOfTimestamps = new AtomicReference<>(BigInteger.ZERO);
   }
@@ -104,10 +107,20 @@ class SamzaOutputMetricOp<T> implements Op<T, T, Void> {
     // update counters for timestamps
     count.incrementAndGet();
     sumOfTimestamps.updateAndGet(sum -> 
sum.add(BigInteger.valueOf(System.nanoTime())));
-    samzaTransformMetricRegistry
-        .getTransformMetrics()
-        .getTransformOutputThroughput(transformFullName)
-        .inc();
+    switch (opType) {
+      case INPUT:
+        samzaTransformMetricRegistry
+            .getTransformMetrics()
+            .getTransformInputThroughput(transformFullName)
+            .inc();
+        break;
+      case OUTPUT:
+        samzaTransformMetricRegistry
+            .getTransformMetrics()
+            .getTransformOutputThroughput(transformFullName)
+            .inc();
+        break;
+    }
     emitter.emitElement(inputElement);
   }
 
@@ -131,16 +144,20 @@ class SamzaOutputMetricOp<T> implements Op<T, T, Void> {
       // Update MetricOp Registry with avg arrival for the pValue
       samzaTransformMetricRegistry.updateArrivalTimeMap(
           transformFullName, pValue, watermark.getMillis(), avg);
-      // compute & emit the latency metric
-      samzaTransformMetricRegistry.emitLatencyMetric(
-          transformFullName, transformInputs, transformOutputs, 
watermark.getMillis(), task);
+      if (opType == SamzaMetricOpFactory.OpType.OUTPUT) {
+        // compute & emit the latency metric if the opType is OUTPUT
+        samzaTransformMetricRegistry.emitLatencyMetric(
+            transformFullName, transformInputs, transformOutputs, 
watermark.getMillis(), task);
+      }
     }
 
-    // update output watermark progress metric
-    samzaTransformMetricRegistry
-        .getTransformMetrics()
-        .getTransformWatermarkProgress(transformFullName)
-        .set(watermark.getMillis());
+    if (opType == SamzaMetricOpFactory.OpType.OUTPUT) {
+      // update output watermark progress metric
+      samzaTransformMetricRegistry
+          .getTransformMetrics()
+          .getTransformWatermarkProgress(transformFullName)
+          .set(watermark.getMillis());
+    }
 
     // reset all counters
     count.set(0L);
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricOpFactory.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricOpFactory.java
index 9d4f4013b0f..503e9049b98 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricOpFactory.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricOpFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.samza.metrics;
 
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.samza.runtime.Op;
 import org.checkerframework.checker.nullness.qual.NonNull;
 
@@ -45,6 +46,7 @@ public class SamzaMetricOpFactory {
   /**
    * Create a {@link Op} for default transform metric computation.
    *
+   * @param urn URN of the PCollection metric Op is processing
    * @param pValue name of the PCollection metric Op is processing
    * @param transformName name of the PTransform for which metric Op is created
    * @param opType type of the metric
@@ -53,17 +55,19 @@ public class SamzaMetricOpFactory {
    * @return a {@link Op} for default transform metric computation
    */
   public static @NonNull <T> Op<T, T, Void> createMetricOp(
+      @NonNull String urn,
       @NonNull String pValue,
       @NonNull String transformName,
       @NonNull OpType opType,
       @NonNull SamzaTransformMetricRegistry samzaTransformMetricRegistry) {
-    switch (opType) {
-      case INPUT:
-        return new SamzaInputMetricOp(pValue, transformName, 
samzaTransformMetricRegistry);
-      case OUTPUT:
-        return new SamzaOutputMetricOp(pValue, transformName, 
samzaTransformMetricRegistry);
-      default:
-        throw new IllegalArgumentException("Unknown OpType: " + opType);
+    if (isDataShuffleTransform(urn)) {
+      return new SamzaGBKMetricOp<>(pValue, transformName, opType, 
samzaTransformMetricRegistry);
     }
+    return new SamzaMetricOp<>(pValue, transformName, opType, 
samzaTransformMetricRegistry);
+  }
+
+  private static boolean isDataShuffleTransform(String urn) {
+    return urn.equals(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN)
+        || urn.equals(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN);
   }
 }
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetricRegistry.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetricRegistry.java
index 1dd0639e256..e4849c31fa6 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetricRegistry.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetricRegistry.java
@@ -17,11 +17,13 @@
  */
 package org.apache.beam.runners.samza.metrics;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.samza.context.Context;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -42,12 +44,17 @@ public class SamzaTransformMetricRegistry implements 
Serializable {
   // TransformName -> PValue for pCollection -> Map<WatermarkId, 
AvgArrivalTime>
   private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
ConcurrentHashMap<Long, Long>>>
       avgArrivalTimeMap;
+  // TransformName -> Map<WindowId, AvgArrivalTime>
+  @SuppressFBWarnings("SE_BAD_FIELD")
+  private final ConcurrentHashMap<String, ConcurrentHashMap<BoundedWindow, 
Long>>
+      avgArrivalTimeMapForGbk;
 
   // Per Transform Metrics for each primitive transform
   private final SamzaTransformMetrics transformMetrics;
 
   public SamzaTransformMetricRegistry() {
     this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+    this.avgArrivalTimeMapForGbk = new ConcurrentHashMap<>();
     this.transformMetrics = new SamzaTransformMetrics();
   }
 
@@ -55,6 +62,7 @@ public class SamzaTransformMetricRegistry implements 
Serializable {
   SamzaTransformMetricRegistry(SamzaTransformMetrics samzaTransformMetrics) {
     this.transformMetrics = samzaTransformMetrics;
     this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+    this.avgArrivalTimeMapForGbk = new ConcurrentHashMap<>();
   }
 
   public void register(String transformFullName, String pValue, Context ctx) {
@@ -62,6 +70,7 @@ public class SamzaTransformMetricRegistry implements 
Serializable {
     // initialize the map for the transform
     avgArrivalTimeMap.putIfAbsent(transformFullName, new 
ConcurrentHashMap<>());
     avgArrivalTimeMap.get(transformFullName).putIfAbsent(pValue, new 
ConcurrentHashMap<>());
+    avgArrivalTimeMapForGbk.putIfAbsent(transformFullName, new 
ConcurrentHashMap<>());
   }
 
   public SamzaTransformMetrics getTransformMetrics() {
@@ -80,6 +89,49 @@ public class SamzaTransformMetricRegistry implements 
Serializable {
     }
   }
 
+  public void updateArrivalTimeMap(String transformName, BoundedWindow 
windowId, long avg) {
+    ConcurrentHashMap<BoundedWindow, Long> avgArrivalTimeMapForTransform =
+        avgArrivalTimeMapForGbk.get(transformName);
+    if (avgArrivalTimeMapForTransform != null) {
+      avgArrivalTimeMapForTransform.put(windowId, avg);
+    }
+  }
+
+  @SuppressWarnings("nullness")
+  public void emitLatencyMetric(
+      String transformName, BoundedWindow windowId, long avgArrivalEndTime, 
String taskName) {
+    Long avgArrivalStartTime =
+        avgArrivalTimeMapForGbk.get(transformName) != null
+            ? avgArrivalTimeMapForGbk.get(transformName).remove(windowId)
+            : null;
+
+    if (avgArrivalStartTime == null || avgArrivalStartTime == 0 || 
avgArrivalEndTime == 0) {
+      LOG.debug(
+          "Failure to Emit Metric for Transform: {}, Start-Time: {} or 
End-Time: {} found is 0/null for windowId: {}, task: {}",
+          transformName,
+          avgArrivalStartTime,
+          avgArrivalEndTime,
+          windowId,
+          taskName);
+      return;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Success Emit Metric for Transform: {}, window: {} for task: {}",
+          transformName,
+          windowId,
+          taskName);
+    }
+    transformMetrics
+        .getTransformLatencyMetric(transformName)
+        .update(avgArrivalEndTime - avgArrivalStartTime);
+
+    transformMetrics
+        .getTransformCacheSize(transformName)
+        .set((long) avgArrivalTimeMapForGbk.get(transformName).size());
+  }
+
   // Checker framework bug: 
https://github.com/typetools/checker-framework/issues/979
   @SuppressWarnings("return")
   public void emitLatencyMetric(
@@ -126,6 +178,14 @@ public class SamzaTransformMetricRegistry implements 
Serializable {
     final long endTime = Collections.max(outputPValuesAvgArrivalTimes);
     final long latency = endTime - startTime;
     transformMetrics.getTransformLatencyMetric(transformName).update(latency);
+
+    transformMetrics
+        .getTransformCacheSize(transformName)
+        .set(
+            avgArrivalTimeMapForTransform.values().stream()
+                .mapToLong(ConcurrentHashMap::size)
+                .sum());
+
     LOG.debug(
         "Success Emit Metric Transform: {} for watermark: {} for task: {}",
         transformName,
@@ -139,4 +199,10 @@ public class SamzaTransformMetricRegistry implements 
Serializable {
       String transformName) {
     return avgArrivalTimeMap.get(transformName);
   }
+
+  @VisibleForTesting
+  @Nullable
+  ConcurrentHashMap<BoundedWindow, Long> getAverageArrivalTimeMapForGBK(String 
transformName) {
+    return avgArrivalTimeMapForGbk.get(transformName);
+  }
 }
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetrics.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetrics.java
index 13fbf6007e5..49da3effd8c 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetrics.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetrics.java
@@ -43,6 +43,8 @@ public class SamzaTransformMetrics implements Serializable {
   private static final String TRANSFORM_IP_THROUGHPUT = "num-input-messages";
   private static final String TRANSFORM_OP_THROUGHPUT = "num-output-messages";
 
+  private static final String TRANSFORM_ARRIVAL_TIME_CACHE_SIZE = 
"in-mem-cache-size";
+
   // Transform name to metric maps
   @SuppressFBWarnings("SE_BAD_FIELD")
   private final Map<String, Timer> transformLatency;
@@ -56,11 +58,15 @@ public class SamzaTransformMetrics implements Serializable {
   @SuppressFBWarnings("SE_BAD_FIELD")
   private final Map<String, Counter> transformOutputThroughPut;
 
+  @SuppressFBWarnings("SE_BAD_FIELD")
+  private final Map<String, Gauge<Long>> transformCacheSize;
+
   public SamzaTransformMetrics() {
     this.transformLatency = new ConcurrentHashMap<>();
     this.transformOutputThroughPut = new ConcurrentHashMap<>();
     this.transformWatermarkProgress = new ConcurrentHashMap<>();
     this.transformInputThroughput = new ConcurrentHashMap<>();
+    this.transformCacheSize = new ConcurrentHashMap<>();
   }
 
   public void register(String transformName, Context ctx) {
@@ -91,6 +97,14 @@ public class SamzaTransformMetrics implements Serializable {
         transformName,
         metricsRegistry.newCounter(
             GROUP, getMetricNameWithPrefix(TRANSFORM_IP_THROUGHPUT, 
transformName)));
+    transformCacheSize.putIfAbsent(
+        transformName,
+        ctx.getTaskContext()
+            .getTaskMetricsRegistry()
+            .newGauge(
+                GROUP,
+                getMetricNameWithPrefix(TRANSFORM_ARRIVAL_TIME_CACHE_SIZE, 
transformName),
+                0L));
   }
 
   public Timer getTransformLatencyMetric(String transformName) {
@@ -105,6 +119,10 @@ public class SamzaTransformMetrics implements Serializable 
{
     return transformOutputThroughPut.get(transformName);
   }
 
+  public Gauge<Long> getTransformCacheSize(String transformName) {
+    return transformCacheSize.get(transformName);
+  }
+
   public Gauge<Long> getTransformWatermarkProgress(String transformName) {
     return transformWatermarkProgress.get(transformName);
   }
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
index 09ed68457b3..e3e4bb46fce 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
@@ -182,13 +182,10 @@ public class TranslationContext {
       TransformHierarchy.Node node,
       SamzaMetricOpFactory.OpType opType) {
     final Boolean enableTransformMetrics = 
getPipelineOptions().getEnableTransformMetrics();
-    final String urn = PTransformTranslation.urnForTransformOrNull(transform);
+    final String transformURN = 
PTransformTranslation.urnForTransformOrNull(transform);
 
-    // skip attach transform if user override is false or transform is GBK
-    if (!enableTransformMetrics
-        || urn == null
-        || urn.equals(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN)
-        || urn.equals(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN)) {
+    // skip attach transform if user override is false or transform is not 
registered
+    if (!enableTransformMetrics || transformURN == null) {
       return;
     }
 
@@ -198,11 +195,22 @@ public class TranslationContext {
     }
 
     for (PValue pValue : getPValueForTransform(opType, transform, node)) {
+      // skip attach transform if pValue is not registered i.e. if not 
translated with a samza
+      // translator
+      if (!messsageStreams.containsKey(pValue)) {
+        LOG.debug(
+            "Skip attach transform metric op for pValue: {} for transform: {}",
+            pValue,
+            getTransformFullName());
+        continue;
+      }
+
       // add another step for default metric computation
       getMessageStream(pValue)
           .flatMapAsync(
               OpAdapter.adapt(
                   SamzaMetricOpFactory.createMetricOp(
+                      transformURN,
                       pValue.getName(),
                       getTransformFullName(),
                       opType,
diff --git 
a/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaRunnerWithTransformMetrics.java
 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaRunnerWithTransformMetrics.java
index 60a1a20d300..a605f4fc27b 100644
--- 
a/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaRunnerWithTransformMetrics.java
+++ 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaRunnerWithTransformMetrics.java
@@ -35,10 +35,13 @@ import 
org.apache.beam.runners.samza.util.InMemoryMetricsReporter;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -49,6 +52,7 @@ import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.Metric;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.system.WatermarkMessage;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
 
@@ -63,7 +67,7 @@ public class TestSamzaRunnerWithTransformMetrics {
     TestSamzaRunner testSamzaRunner = TestSamzaRunner.fromOptions(options);
     Pipeline pipeline = Pipeline.create(options);
     // Create a pipeline
-    PCollection<KV<String, Integer>> output =
+    PCollection<KV<String, Long>> output =
         pipeline
             .apply(
                 "Mock data",
@@ -73,9 +77,12 @@ public class TestSamzaRunnerWithTransformMetrics {
                     KV.of("hello", KV.of("b", 42)),
                     KV.of("hello", KV.of("c", 12))))
             .apply("Filter valid keys", Filter.by(x -> 
x.getKey().equals("hello")))
-            .apply(Values.create());
+            .apply(Values.create())
+            .apply("Fixed-window", 
Window.into(FixedWindows.of(Duration.standardSeconds(10))))
+            .apply(Count.perKey());
+
     // check pipeline is working fine
-    PAssert.that(output).containsInAnyOrder(KV.of("a", 97), KV.of("b", 42), 
KV.of("c", 12));
+    PAssert.that(output).containsInAnyOrder(KV.of("a", 1L), KV.of("b", 1L), 
KV.of("c", 1L));
     testSamzaRunner.run(pipeline);
 
     Map<String, Metric> pTransformContainerMetrics =
@@ -122,15 +129,31 @@ public class TestSamzaRunnerWithTransformMetrics {
                 pTransformContainerMetrics.get(
                     
"Values_Values_Map_ParMultiDo_Anonymous_-num-output-messages"))
             .getCount());
+    assertEquals(
+        3,
+        ((Counter) 
pTransformContainerMetrics.get("Fixed_window_Window_Assign-num-input-messages"))
+            .getCount());
+    assertEquals(
+        3,
+        ((Counter) 
pTransformContainerMetrics.get("Fixed_window_Window_Assign-num-output-messages"))
+            .getCount());
+    assertEquals(
+        3,
+        ((Counter) 
pTransformContainerMetrics.get("Combine_perKey_Count_-num-input-messages"))
+            .getCount());
+    assertEquals(
+        3,
+        ((Counter) 
pTransformContainerMetrics.get("Combine_perKey_Count_-num-output-messages"))
+            .getCount());
 
     // Throughput Metrics are per container by default
     assertNotNull(
         pTransformContainerMetrics.get(
             
"Filter_valid_keys_ParDo_Anonymous__ParMultiDo_Anonymous_-handle-message-ns"));
-    // One message is dropped from filter
     assertNotNull(
         pTransformContainerMetrics.get(
             "Values_Values_Map_ParMultiDo_Anonymous_-handle-message-ns"));
+    
assertNotNull(pTransformContainerMetrics.get("Combine_perKey_Count_-handle-message-ns"));
 
     // Watermark Metrics are Per task by default
     assertTrue(
@@ -143,6 +166,7 @@ public class TestSamzaRunnerWithTransformMetrics {
             
"Filter_valid_keys_ParDo_Anonymous__ParMultiDo_Anonymous_-output-watermark-ms"));
     assertNotNull(
         
pTransformTaskMetrics.get("Values_Values_Map_ParMultiDo_Anonymous_-output-watermark-ms"));
+    
assertNotNull(pTransformTaskMetrics.get("Combine_perKey_Count_-output-watermark-ms"));
   }
 
   @Test
@@ -161,6 +185,7 @@ public class TestSamzaRunnerWithTransformMetrics {
     Counter inputCounter = new Counter("filter-input-counter");
     Counter outputCounter = new Counter("filter-output-counter");
     Gauge<Long> watermarkProgress = new Gauge<>("filter-output-watermark", 0L);
+    Gauge<Long> cacheSize = new Gauge<>("filter-arrival-time-cache-size", 0L);
     Timer latency = new Timer("filter-latency");
 
     SamzaTransformMetrics samzaTransformMetrics = 
mock(SamzaTransformMetrics.class);
@@ -170,14 +195,19 @@ public class TestSamzaRunnerWithTransformMetrics {
     when(samzaTransformMetrics.getTransformWatermarkProgress("filter"))
         .thenReturn(watermarkProgress);
     
when(samzaTransformMetrics.getTransformLatencyMetric("filter")).thenReturn(latency);
+    
when(samzaTransformMetrics.getTransformCacheSize("filter")).thenReturn(cacheSize);
 
     SamzaTransformMetricRegistry samzaTransformMetricRegistry =
         spy(new SamzaTransformMetricRegistry(samzaTransformMetrics));
     samzaTransformMetricRegistry.register("filter", "dummy-pvalue.in", 
mock(Context.class));
     samzaTransformMetricRegistry.register("filter", "dummy-pvalue.out", 
mock(Context.class));
 
-    SamzaInputMetricOp<String> inputMetricOp =
-        new SamzaInputMetricOp<>("dummy-pvalue.in", "filter", 
samzaTransformMetricRegistry);
+    SamzaMetricOp<String> inputMetricOp =
+        new SamzaMetricOp<>(
+            "dummy-pvalue.in",
+            "filter",
+            SamzaMetricOpFactory.OpType.INPUT,
+            samzaTransformMetricRegistry);
 
     inputMetricOp.processElement(windowedValue, opEmitter);
     inputMetricOp.processElement(windowedValue2, opEmitter);
@@ -192,8 +222,12 @@ public class TestSamzaRunnerWithTransformMetrics {
             .get("dummy-pvalue.in")
             .containsKey(watermarkMessage.getTimestamp()));
 
-    SamzaOutputMetricOp<String> outputMetricOp =
-        new SamzaOutputMetricOp<>("dummy-pvalue.out", "filter", 
samzaTransformMetricRegistry);
+    SamzaMetricOp<String> outputMetricOp =
+        new SamzaMetricOp<>(
+            "dummy-pvalue.out",
+            "filter",
+            SamzaMetricOpFactory.OpType.OUTPUT,
+            samzaTransformMetricRegistry);
     outputMetricOp.init(ImmutableList.of("dummy-pvalue.in"), 
ImmutableList.of("dummy-pvalue.out"));
 
     outputMetricOp.processElement(windowedValue, opEmitter);
@@ -206,5 +240,80 @@ public class TestSamzaRunnerWithTransformMetrics {
     assertEquals(watermarkMessage.getTimestamp(), 
watermarkProgress.getValue().longValue());
     // Latency must be positive
     assertTrue(latency.getSnapshot().getAverage() > 0);
+    // Cache size must be 0
+    assertEquals(0, cacheSize.getValue().intValue());
+  }
+
+  @Test
+  public void testSamzaInputAndOutputGBKMetricOp() {
+    final WindowedValue<String> windowedValue =
+        WindowedValue.timestampedValueInGlobalWindow("value-1", new Instant());
+    final WindowedValue<String> windowedValue2 =
+        WindowedValue.timestampedValueInGlobalWindow("value-2", new Instant());
+    final WatermarkMessage watermarkMessage =
+        new WatermarkMessage(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+
+    OpEmitter<String> opEmitter = mock(OpEmitter.class);
+    doNothing().when(opEmitter).emitElement(any());
+    doNothing().when(opEmitter).emitWatermark(any());
+
+    Counter inputCounter = new Counter("Count-perKey-input-counter");
+    Counter outputCounter = new Counter("Count-perKey-output-counter");
+    Gauge<Long> watermarkProgress = new 
Gauge<>("Count-perKey-output-watermark", 0L);
+    Timer latency = new Timer("Count-perKey-latency");
+    Gauge<Long> cacheSize = new 
Gauge<>("Count-perKey-arrival-time-cache-size", 0L);
+
+    SamzaTransformMetrics samzaTransformMetrics = 
mock(SamzaTransformMetrics.class);
+    doNothing().when(samzaTransformMetrics).register(any(), any());
+    when(samzaTransformMetrics.getTransformInputThroughput("Count-perKey"))
+        .thenReturn(inputCounter);
+    when(samzaTransformMetrics.getTransformOutputThroughput("Count-perKey"))
+        .thenReturn(outputCounter);
+    when(samzaTransformMetrics.getTransformWatermarkProgress("Count-perKey"))
+        .thenReturn(watermarkProgress);
+    
when(samzaTransformMetrics.getTransformLatencyMetric("Count-perKey")).thenReturn(latency);
+    
when(samzaTransformMetrics.getTransformCacheSize("Count-perKey")).thenReturn(cacheSize);
+
+    SamzaTransformMetricRegistry samzaTransformMetricRegistry =
+        spy(new SamzaTransformMetricRegistry(samzaTransformMetrics));
+    samzaTransformMetricRegistry.register("Count-perKey", "window-assign.in", 
mock(Context.class));
+    samzaTransformMetricRegistry.register("Count-perKey", "window-assign.out", 
mock(Context.class));
+
+    SamzaGBKMetricOp<String> inputMetricOp =
+        new SamzaGBKMetricOp<>(
+            "window-assign.in",
+            "Count-perKey",
+            SamzaMetricOpFactory.OpType.INPUT,
+            samzaTransformMetricRegistry);
+
+    inputMetricOp.processElement(windowedValue, opEmitter);
+    inputMetricOp.processElement(windowedValue2, opEmitter);
+    inputMetricOp.processWatermark(new 
Instant(watermarkMessage.getTimestamp()), opEmitter);
+
+    // Input throughput must be updated
+    assertEquals(2, inputCounter.getCount());
+    // Avg arrival time for must be present for one Global Window
+    assertEquals(
+        1, 
samzaTransformMetricRegistry.getAverageArrivalTimeMapForGBK("Count-perKey").size());
+
+    SamzaGBKMetricOp<String> outputMetricOp =
+        new SamzaGBKMetricOp<>(
+            "window-assign.out",
+            "Count-perKey",
+            SamzaMetricOpFactory.OpType.OUTPUT,
+            samzaTransformMetricRegistry);
+
+    outputMetricOp.processElement(windowedValue, opEmitter);
+    outputMetricOp.processElement(windowedValue2, opEmitter);
+    outputMetricOp.processWatermark(new 
Instant(watermarkMessage.getTimestamp()), opEmitter);
+
+    // Output throughput must be updated
+    assertEquals(2, outputCounter.getCount());
+    // Output watermark must be updated
+    assertEquals(watermarkMessage.getTimestamp(), 
watermarkProgress.getValue().longValue());
+    // Latency must be positive
+    assertTrue(latency.getSnapshot().getAverage() > 0);
+    // Cache size must be 0
+    assertEquals(0, cacheSize.getValue().intValue());
   }
 }
diff --git 
a/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaTransformMetricsRegistry.java
 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaTransformMetricsRegistry.java
new file mode 100644
index 00000000000..f8dacc3aec6
--- /dev/null
+++ 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaTransformMetricsRegistry.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.system.WatermarkMessage;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+public class TestSamzaTransformMetricsRegistry {
+
+  @Test
+  public void testSamzaTransformMetricsRegistryForNonShuffleOperators() {
+    final WatermarkMessage watermarkMessage =
+        new WatermarkMessage(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+    final long avgInputArrivalTime = System.currentTimeMillis();
+
+    Timer latency = new Timer("filter-latency");
+    Gauge<Long> cacheSize = new Gauge<Long>("filter-cache-size", 0L);
+    SamzaTransformMetrics samzaTransformMetrics = 
mock(SamzaTransformMetrics.class);
+    doNothing().when(samzaTransformMetrics).register(any(), any());
+    
when(samzaTransformMetrics.getTransformLatencyMetric("filter")).thenReturn(latency);
+    
when(samzaTransformMetrics.getTransformCacheSize("filter")).thenReturn(cacheSize);
+
+    SamzaTransformMetricRegistry samzaTransformMetricRegistry =
+        spy(new SamzaTransformMetricRegistry(samzaTransformMetrics));
+
+    samzaTransformMetricRegistry.register("filter", "dummy-pvalue.in", 
mock(Context.class));
+    samzaTransformMetricRegistry.register("filter", "dummy-pvalue.out", 
mock(Context.class));
+
+    // Update the avg arrival time
+    samzaTransformMetricRegistry.updateArrivalTimeMap(
+        "filter", "dummy-pvalue.in", watermarkMessage.getTimestamp(), 
avgInputArrivalTime);
+    samzaTransformMetricRegistry.updateArrivalTimeMap(
+        "filter", "dummy-pvalue.out", watermarkMessage.getTimestamp(), 
avgInputArrivalTime + 100);
+
+    // Check the avg arrival time is updated
+    assertEquals(
+        avgInputArrivalTime,
+        samzaTransformMetricRegistry
+            .getAverageArrivalTimeMap("filter")
+            .get("dummy-pvalue.in")
+            .get(watermarkMessage.getTimestamp())
+            .longValue());
+    assertEquals(
+        avgInputArrivalTime + 100,
+        samzaTransformMetricRegistry
+            .getAverageArrivalTimeMap("filter")
+            .get("dummy-pvalue.out")
+            .get(watermarkMessage.getTimestamp())
+            .longValue());
+
+    // Emit the latency metric
+    samzaTransformMetricRegistry.emitLatencyMetric(
+        "filter",
+        ImmutableList.of("dummy-pvalue.in"),
+        ImmutableList.of("dummy-pvalue.out"),
+        watermarkMessage.getTimestamp(),
+        "task 0");
+
+    // Check the latency metric is updated
+    assertTrue(100 == latency.getSnapshot().getAverage());
+
+    // Check the avg arrival time is cleared
+    assertFalse(
+        samzaTransformMetricRegistry
+            .getAverageArrivalTimeMap("filter")
+            .get("dummy-pvalue.in")
+            .containsKey(watermarkMessage.getTimestamp()));
+    assertFalse(
+        samzaTransformMetricRegistry
+            .getAverageArrivalTimeMap("filter")
+            .get("dummy-pvalue.out")
+            .containsKey(watermarkMessage.getTimestamp()));
+    // Cache size must be 0
+    assertEquals(0, cacheSize.getValue().intValue());
+  }
+
+  @Test
+  public void testSamzaTransformMetricsRegistryForDataShuffleOperators() {
+    Timer latency = new Timer("Count-perKey-latency");
+    Gauge<Long> cacheSize = new Gauge<Long>("Count-perKey-cache-size", 0L);
+
+    SamzaTransformMetrics samzaTransformMetrics = 
mock(SamzaTransformMetrics.class);
+    doNothing().when(samzaTransformMetrics).register(any(), any());
+    
when(samzaTransformMetrics.getTransformLatencyMetric("Count.perKey")).thenReturn(latency);
+    
when(samzaTransformMetrics.getTransformCacheSize("Count.perKey")).thenReturn(cacheSize);
+
+    SamzaTransformMetricRegistry samzaTransformMetricRegistry =
+        spy(new SamzaTransformMetricRegistry(samzaTransformMetrics));
+
+    samzaTransformMetricRegistry.register("Count.perKey", "window-assign.in", 
mock(Context.class));
+    samzaTransformMetricRegistry.register("Count.perKey", "window-assign.out", 
mock(Context.class));
+
+    final BoundedWindow first =
+        new BoundedWindow() {
+          @Override
+          public Instant maxTimestamp() {
+            return new Instant(2048L);
+          }
+        };
+    final BoundedWindow second =
+        new BoundedWindow() {
+          @Override
+          public Instant maxTimestamp() {
+            return new Instant(689743L);
+          }
+        };
+
+    // Update the avg arrival time
+    samzaTransformMetricRegistry.updateArrivalTimeMap("Count.perKey", first, 
1048L);
+    samzaTransformMetricRegistry.updateArrivalTimeMap("Count.perKey", second, 
4897L);
+
+    // Check the avg arrival time is updated
+    assertEquals(
+        1048L,
+        samzaTransformMetricRegistry
+            .getAverageArrivalTimeMapForGBK("Count.perKey")
+            .get(first)
+            .longValue());
+    assertEquals(
+        4897L,
+        samzaTransformMetricRegistry
+            .getAverageArrivalTimeMapForGBK("Count.perKey")
+            .get(second)
+            .longValue());
+
+    // Emit the latency metric
+    samzaTransformMetricRegistry.emitLatencyMetric("Count.perKey", first, 
2048L, "task 0");
+    samzaTransformMetricRegistry.emitLatencyMetric("Count.perKey", second, 
5897L, "task 0");
+
+    // Check the latency metric is updated
+    assertTrue(1000 == latency.getSnapshot().getAverage());
+
+    // Check the avg arrival time is cleared
+    assertFalse(
+        samzaTransformMetricRegistry
+            .getAverageArrivalTimeMapForGBK("Count.perKey")
+            .containsKey(first));
+    assertFalse(
+        samzaTransformMetricRegistry
+            .getAverageArrivalTimeMapForGBK("Count.perKey")
+            .containsKey(second));
+
+    // Failure testing
+    samzaTransformMetricRegistry.updateArrivalTimeMap("random-transform", 
first, 1048L);
+    samzaTransformMetricRegistry.updateArrivalTimeMap("random-transform", 
first, 1048L);
+    // No data updated
+    assertFalse(
+        samzaTransformMetricRegistry
+            .getAverageArrivalTimeMapForGBK("Count.perKey")
+            .containsKey(first));
+    
assertNull(samzaTransformMetricRegistry.getAverageArrivalTimeMapForGBK("random-transform"));
+    // Emit the bad latency metric
+    samzaTransformMetricRegistry.emitLatencyMetric("random-transform", first, 
2048L, "task 0");
+    samzaTransformMetricRegistry.emitLatencyMetric("random-transform", first, 
0, "task 0");
+    // Check the latency metric is same
+    assertTrue(1000 == latency.getSnapshot().getAverage());
+    // Cache size must be 0
+    assertEquals(0, cacheSize.getValue().intValue());
+  }
+}

Reply via email to