xinyuiscool commented on code in PR #26437:
URL: https://github.com/apache/beam/pull/26437#discussion_r1187650581


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaInputMetricOp.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.samza.runtime.KeyedTimerData;
+import org.apache.beam.runners.samza.runtime.Op;
+import org.apache.beam.runners.samza.runtime.OpEmitter;
+import org.apache.beam.runners.samza.util.PipelineJsonRenderer;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SamzaInputMetricOp emits & maintains default transform metrics for input 
PCollection to the
+ * transform. It emits the input throughput and maintains avg arrival time for 
input PCollection per
+ * watermark.
+ *
+ * <p>Assumes that {@code SamzaInputMetricOp#processWatermark(Instant, 
OpEmitter)} is exclusive of
+ * {@code SamzaInputMetricOp#processElement(Instant, OpEmitter)}. 
Specifically, the processWatermark
+ * method assumes that no calls to processElement will be made during its 
execution, and vice versa.
+ *
+ * @param <T> The type of the elements in the input PCollection.
+ */
+class SamzaInputMetricOp<T> implements Op<T, T, Void> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaInputMetricOp.class);
+
+  // Unique name of the PTransform this MetricOp is associated with
+  protected final String transformFullName;
+  protected final SamzaTransformMetricRegistry samzaTransformMetricRegistry;
+  // Name or identifier of the PCollection which PTransform is processing
+  protected final String pValue;
+  // List of input PValue(s) for all PCollections processing the PTransform
+  protected transient List<String> transformInputs;
+  // List of output PValue(s) for all PCollections processing the PTransform
+  protected transient List<String> transformOutputs;
+  // Name of the task, for logging purpose
+  protected transient String task;
+  // Counters to maintain avg arrival time per watermark for input PCollection.
+  private final AtomicLong count;
+  private AtomicReference<BigInteger> sumOfTimestamps;

Review Comment:
   final?  Please move these two final vars def up to other final vars so they 
are declared together.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaOutputMetricOp.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.samza.runtime.KeyedTimerData;
+import org.apache.beam.runners.samza.runtime.Op;
+import org.apache.beam.runners.samza.runtime.OpEmitter;
+import org.apache.beam.runners.samza.util.PipelineJsonRenderer;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SamzaOutputMetricOp is a metric Op that emits & maintains default transform 
metrics for output
+ * PCollection to the transform. It emits the output throughput and maintains 
avg arrival time for
+ * output PCollection per watermark.
+ *
+ * <p>Assumes that {@code SamzaOutputMetricOp#processWatermark(Instant, 
OpEmitter)} is exclusive of
+ * {@code SamzaOutputMetricOp#processElement(Instant, OpEmitter)}. 
Specifically, the
+ * processWatermark method assumes that no calls to processElement will be 
made during its
+ * execution, and vice versa.
+ *
+ * @param <T> The type of the elements in the output PCollection.
+ */
+class SamzaOutputMetricOp<T> implements Op<T, T, Void> {
+  // Unique name of the PTransform this MetricOp is associated with
+  protected final String transformFullName;
+  protected final SamzaTransformMetricRegistry samzaTransformMetricRegistry;
+  // Name or identifier of the PCollection which PTransform is processing
+  protected final String pValue;
+  // List of input PValue(s) for all PCollections processing the PTransform
+  protected transient List<String> transformInputs;
+  // List of output PValue(s) for all PCollections processing the PTransform
+  protected transient List<String> transformOutputs;
+  // Name of the task, for logging purpose
+  protected transient String task;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaOutputMetricOp.class);
+  // Counters for output throughput
+  private AtomicLong count;
+  private AtomicReference<BigInteger> sumOfTimestamps;

Review Comment:
   final? Same above, please move the final vars together.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaOutputMetricOp.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.samza.runtime.KeyedTimerData;
+import org.apache.beam.runners.samza.runtime.Op;
+import org.apache.beam.runners.samza.runtime.OpEmitter;
+import org.apache.beam.runners.samza.util.PipelineJsonRenderer;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SamzaOutputMetricOp is a metric Op that emits & maintains default transform 
metrics for output
+ * PCollection to the transform. It emits the output throughput and maintains 
avg arrival time for
+ * output PCollection per watermark.
+ *
+ * <p>Assumes that {@code SamzaOutputMetricOp#processWatermark(Instant, 
OpEmitter)} is exclusive of
+ * {@code SamzaOutputMetricOp#processElement(Instant, OpEmitter)}. 
Specifically, the
+ * processWatermark method assumes that no calls to processElement will be 
made during its
+ * execution, and vice versa.
+ *
+ * @param <T> The type of the elements in the output PCollection.
+ */
+class SamzaOutputMetricOp<T> implements Op<T, T, Void> {
+  // Unique name of the PTransform this MetricOp is associated with
+  protected final String transformFullName;
+  protected final SamzaTransformMetricRegistry samzaTransformMetricRegistry;
+  // Name or identifier of the PCollection which PTransform is processing
+  protected final String pValue;
+  // List of input PValue(s) for all PCollections processing the PTransform
+  protected transient List<String> transformInputs;
+  // List of output PValue(s) for all PCollections processing the PTransform
+  protected transient List<String> transformOutputs;
+  // Name of the task, for logging purpose
+  protected transient String task;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaOutputMetricOp.class);
+  // Counters for output throughput
+  private AtomicLong count;

Review Comment:
   final?



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -168,6 +177,81 @@ public <OutT> MessageStream<OpMessage<OutT>> 
getMessageStream(PValue pvalue) {
     return stream;
   }
 
+  public <InT extends PValue, OutT extends PValue> void 
attachTransformMetricOp(
+      PTransform<InT, OutT> transform,
+      TransformHierarchy.Node node,
+      SamzaMetricOpFactory.OpType opType) {
+    final Boolean enableTransformMetrics = 
getPipelineOptions().getEnableTransformMetrics();
+    final String urn = PTransformTranslation.urnForTransformOrNull(transform);
+
+    // skip attach transform if user override is false or transform is GBK
+    if (!enableTransformMetrics
+        || urn == null
+        || urn.equals(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN)
+        || urn.equals(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN)) {
+      return;
+    }
+
+    // skip attach transform if transform is reading from external sources
+    if (isIOTransform(node, opType)) {
+      return;
+    }
+
+    for (PValue pValue : getPValueForTransform(opType, transform, node)) {
+      // add another step for default metric computation
+      getMessageStream(pValue)
+          .flatMapAsync(
+              OpAdapter.adapt(
+                  SamzaMetricOpFactory.createMetricOp(
+                      pValue.getName(),
+                      getTransformFullName(),
+                      opType,
+                      samzaTransformMetricRegistry),
+                  this));
+    }
+  }
+
+  // Get the input or output PValue for a transform
+  private <InT extends PValue, OutT extends PValue> List<PValue> 
getPValueForTransform(
+      SamzaMetricOpFactory.OpType opType,
+      @NonNull PTransform<InT, OutT> transform,
+      @NonNull TransformHierarchy.Node node) {
+    switch (opType) {
+      case INPUT:
+        {
+          if (node.getInputs().size() > 1) {
+            return node.getInputs().entrySet().stream()
+                .map(Map.Entry::getValue)
+                .collect(Collectors.toList());
+          } else {
+            return ImmutableList.of(getInput(transform));
+          }
+        }
+      case OUTPUT:
+        if (node.getOutputs().size() > 1) {
+          return node.getOutputs().entrySet().stream()
+              .map(Map.Entry::getValue)
+              .collect(Collectors.toList());
+        }
+        return ImmutableList.of(getOutput(transform));
+      default:
+        throw new IllegalArgumentException("Unknown opType: " + opType);
+    }
+  }
+
+  // Transforms that read or write to/from external sources are not supported
+  private boolean isIOTransform(

Review Comment:
   static?



##########
runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaRunnerWithTransformMetrics.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
+import org.apache.beam.runners.samza.TestSamzaRunner;
+import org.apache.beam.runners.samza.util.InMemoryMetricsReporter;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.Metric;
+import org.junit.Test;
+
+public class TestSamzaRunnerWithTransformMetrics {

Review Comment:
   Thanks for adding the tests. For functional tests, can we add one for the 
SamzaInputMetricOp and one for SamzaOutputMetricOp. We can populate the time 
and watermark and make sure the latency output is correct.



##########
runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java:
##########
@@ -43,45 +53,104 @@
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.system.descriptors.GenericSystemDescriptor;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({OpAdapter.class, PTransformTranslation.class})
 @SuppressWarnings({"rawtypes"})
 public class TranslationContextTest {
-  private final GenericInputDescriptor testInputDescriptor =
-      new GenericSystemDescriptor("mockSystem", "mockFactoryClassName")
-          .getInputDescriptor("test-input-1", mock(Serde.class));
-  MapFunction<Object, String> keyFn = m -> m.toString();
-  MapFunction<Object, Object> valueFn = m -> m;
-  private final String streamName = "testStream";
-  KVSerde<Object, Object> serde = KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>());
-  StreamApplicationDescriptor streamApplicationDescriptor =
-      new StreamApplicationDescriptorImpl(
-          appDesc -> {
-            MessageStream inputStream = 
appDesc.getInputStream(testInputDescriptor);
-            inputStream.partitionBy(keyFn, valueFn, serde, streamName);
-          },
-          getConfig());
-  Map<PValue, String> idMap = new HashMap<>();
-  Set<String> nonUniqueStateIds = new HashSet<>();
-  TranslationContext translationContext =
-      new TranslationContext(
-          streamApplicationDescriptor, idMap, nonUniqueStateIds, 
mock(SamzaPipelineOptions.class));
+  TranslationContext translationContext;
+  AppliedPTransform pTransform;
+  PCollection output;
+  List inputDescriptors;
+  StreamApplicationDescriptor streamApplicationDescriptor;
+  SamzaPipelineOptions pipelineOptions;
+  TransformHierarchy.Node node;
 
-  @Test
-  public void testRegisterInputMessageStreams() {
-    final PCollection output = mock(PCollection.class);
+  @Before
+  public void before() {
+    final GenericInputDescriptor testInputDescriptor =
+        new GenericSystemDescriptor("mockSystem", "mockFactoryClassName")
+            .getInputDescriptor("test-input-1", mock(Serde.class));
+    final MapFunction<Object, String> keyFn = m -> m.toString();
+    final MapFunction<Object, Object> valueFn = m -> m;
+    final String streamName = "testStream";
+    KVSerde<Object, Object> serde = KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>());
+    pipelineOptions = mock(SamzaPipelineOptions.class);
+    when(pipelineOptions.getEnableTransformMetrics()).thenReturn(false);
+    // Create a stream application descriptor with a partitionBy
+    streamApplicationDescriptor =
+        new StreamApplicationDescriptorImpl(
+            appDesc -> {
+              MessageStream inputStream = 
appDesc.getInputStream(testInputDescriptor);
+              inputStream.partitionBy(keyFn, valueFn, serde, streamName);
+            },
+            getConfig());
+    Map<PValue, String> idMap = new HashMap<>();
+    Set<String> nonUniqueStateIds = new HashSet<>();
+    translationContext =
+        new TranslationContext(
+            streamApplicationDescriptor, idMap, nonUniqueStateIds, 
pipelineOptions);
+
+    // Register the input message stream
+    output = mock(PCollection.class);
+    pTransform = mock(AppliedPTransform.class);
+    node = mock(TransformHierarchy.Node.class);
+    when(pTransform.getFullName()).thenReturn("mock-ptransform");
     List<String> topics = Arrays.asList("stream1", "stream2");
-    List inputDescriptors =
+    inputDescriptors =
         topics.stream()
             .map(topicName -> createSamzaInputDescriptor(topicName, topicName))
             .collect(Collectors.toList());
+  }
 
+  @Test
+  public void testRegisterInputMessageStreams() {
+    translationContext.setCurrentTransform(pTransform);
     translationContext.registerInputMessageStreams(output, inputDescriptors);
-
     assertNotNull(translationContext.getMessageStream(output));
   }
 
-  public GenericInputDescriptor<KV<String, OpMessage<?>>> 
createSamzaInputDescriptor(
+  @Test
+  public void testMetricOpNotAttachedForIOTransform() {

Review Comment:
   Let's remove testMetricOpNotAttachedForIOTransform() and 
testMetricOpNotAttachedForConfigOveride() with the usage of powermock all 
together. These tests do not add much value, and brings the heavy powermock 
into the tests. Let's remove them.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaTransformMetricRegistry.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SamzaTransformMetricRegistry is a registry that maintains the metrics for 
each transform. It
+ * maintains the average arrival time for each PCollection for a primitive 
transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time 
is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and 
updated in
+ * avgArrivalTimeMap
+ */
+public class SamzaTransformMetricRegistry implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaTransformMetricRegistry.class);
+
+  // TransformName -> PValue for pCollection -> Map<WatermarkId, 
AvgArrivalTime>
+  private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
ConcurrentHashMap<Long, Long>>>
+      avgArrivalTimeMap;
+
+  // Per Transform Metrics for each primitive transform
+  private final SamzaTransformMetrics transformMetrics;
+
+  public SamzaTransformMetricRegistry() {
+    this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+    this.transformMetrics = new SamzaTransformMetrics();
+  }
+
+  public void register(String transformFullName, String pValue, Context ctx) {
+    transformMetrics.register(transformFullName, ctx);
+    // initialize the map for the transform
+    avgArrivalTimeMap.putIfAbsent(transformFullName, new 
ConcurrentHashMap<>());
+    avgArrivalTimeMap.get(transformFullName).putIfAbsent(pValue, new 
ConcurrentHashMap<>());
+  }
+
+  public SamzaTransformMetrics getTransformMetrics() {
+    return transformMetrics;
+  }
+
+  public void updateArrivalTimeMap(String transformName, String pValue, long 
watermark, long avg) {
+    if (avgArrivalTimeMap.get(transformName) != null
+        && avgArrivalTimeMap.get(transformName).get(pValue) != null) {
+      ConcurrentHashMap<Long, Long> avgArrivalTimeMapForPValue =
+          avgArrivalTimeMap.get(transformName).get(pValue);
+      // update the average arrival time for the latest watermark
+      avgArrivalTimeMapForPValue.put(watermark, avg);
+      // remove any stale entries which are lesser than the watermark
+      // todo: check is this safe to do here input metric op may be ahead in 
watermark than output?
+      // why not do it at emission time?
+      avgArrivalTimeMapForPValue.entrySet().removeIf(entry -> entry.getKey() < 
watermark);
+    }
+  }
+
+  // Checker framework bug: 
https://github.com/typetools/checker-framework/issues/979
+  @SuppressWarnings("return")
+  public void emitLatencyMetric(
+      String transformName,
+      List<String> inputs,
+      List<String> outputs,
+      Long watermark,
+      String taskName) {
+    final ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>> 
avgArrivalTimeMapForTransform =
+        avgArrivalTimeMap.get(transformName);
+
+    if (avgArrivalTimeMapForTransform == null || inputs.isEmpty() || 
outputs.isEmpty()) {
+      return;
+    }
+
+    // get the avg arrival times for all the input PValues
+    final List<Long> inputPValuesAvgArrivalTimes =
+        inputs.stream()
+            .map(avgArrivalTimeMapForTransform::get)
+            .map(map -> map == null ? null : map.remove(watermark))
+            .filter(avgArrivalTime -> avgArrivalTime != null)
+            .collect(Collectors.toList());
+
+    // get the avg arrival times for all the output PValues
+    final List<Long> outputPValuesAvgArrivalTimes =
+        outputs.stream()
+            .map(avgArrivalTimeMapForTransform::get)
+            .map(map -> map == null ? null : map.remove(watermark))
+            .filter(avgArrivalTime -> avgArrivalTime != null)
+            .collect(Collectors.toList());
+
+    if (inputPValuesAvgArrivalTimes.isEmpty() || 
outputPValuesAvgArrivalTimes.isEmpty()) {
+      LOG.debug(
+          "Failure to Emit Metric for Transform: {} inputArrivalTime: {} or 
outputArrivalTime: {} not found for Watermark: {} Task: {}",
+          transformName,
+          inputPValuesAvgArrivalTimes,
+          inputPValuesAvgArrivalTimes,
+          watermark,
+          taskName);
+      return;
+    }
+
+    final long startTime = Collections.min(inputPValuesAvgArrivalTimes);
+    final long endTime = Collections.max(inputPValuesAvgArrivalTimes);

Review Comment:
   Hmm, is this supposed to be outputPValuesAvgArrivalTimes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to