xinyuiscool commented on code in PR #26437:
URL: https://github.com/apache/beam/pull/26437#discussion_r1180867475
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/TestSamzaRunner.java:
##########
@@ -30,22 +31,33 @@
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
/** Test {@link SamzaRunner}. */
public class TestSamzaRunner extends PipelineRunner<PipelineResult> {
private final SamzaRunner delegate;
+ private static InMemoryMetricsReporter testMetricsReporter = new
InMemoryMetricsReporter();
Review Comment:
I think we should add this reporter in the SamzaPipelineOptions when running
the corresponding tests, instead of hard-coded in TestSamzaRunner. I don't
think we need to change the test runner code.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for
each transform. It
+ * maintains the average arrival time for each PCollection for a primitive
transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time
is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and
updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(BeamTransformMetricRegistry.class);
+
+ // TransformName -> PValue for pCollection -> Map<WatermarkId,
AvgArrivalTime>
+ private ConcurrentHashMap<String, ConcurrentHashMap<String,
ConcurrentHashMap<Long, Long>>>
Review Comment:
Why we need transform name here? If you just keep the {PValue_id ->
{watermark -> time}}, seems it should still work.
mark it final.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaInputMetricOp.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.samza.metrics.BeamTransformMetricRegistry;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SamzaInputMetricOp is a {@link SamzaMetricOp} that emits & maintains
default transform metrics
+ * for input PCollection to the transform. It emits the input throughput and
maintains avg arrival
+ * time for input PCollection per watermark.
+ *
+ * <p>Assumes that {@code SamzaInputMetricOp#processWatermark(Instant,
OpEmitter)} is exclusive of
+ * {@code SamzaInputMetricOp#processElement(Instant, OpEmitter)}.
Specifically, the processWatermark
+ * method assumes that no calls to processElement will be made during its
execution, and vice versa.
+ *
+ * @param <T> The type of the elements in the input PCollection.
+ */
+public class SamzaInputMetricOp<T> extends SamzaMetricOp<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(SamzaInputMetricOp.class);
+ // Counters to maintain avg arrival time per watermark for input PCollection.
+ private AtomicLong count;
Review Comment:
final
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -145,9 +152,25 @@ protected <KeyT, OutT> void registerInputMessageStreams(
}
public <OutT> void registerMessageStream(PValue pvalue,
MessageStream<OpMessage<OutT>> stream) {
+ registerMessageStream(pvalue, stream, true);
+ }
+
+ public <OutT> void registerMessageStream(
+ PValue pvalue, MessageStream<OpMessage<OutT>> stream, boolean
enableTransformMetric) {
if (messsageStreams.containsKey(pvalue)) {
throw new IllegalArgumentException("Stream already registered for
pvalue: " + pvalue);
}
+ // add a step to attach OutputMetricOp if registered for Op Stream
+ final Config overrideConfig = new
MapConfig(getPipelineOptions().getConfigOverride());
Review Comment:
Don't use config override. Use PipelineOptions. We have an option for
enable/disable metrics, pls add one more for TransformMetrics.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java:
##########
@@ -292,4 +302,29 @@ static Map<String, Map.Entry<String, String>>
buildTransformIOMap(
private static Supplier<List<String>> getIOPValueList(Map<TupleTag<?>,
PCollection<?>> map) {
return () -> map.values().stream().map(pColl ->
pColl.getName()).collect(Collectors.toList());
}
+
+ // Reads the config to build transformIOMap, i.e. map of inputs & output
PValues for each
+ // PTransform
+ public static Map<String, Map.Entry<String, String>>
getTransformIOMap(Config config) {
+ checkNotNull(config, "Config cannot be null");
+ final Map<String, Map.Entry<String, String>> result = new HashMap<>();
+ final String pipelineJsonGraph = config.get(SamzaRunner.BEAM_JSON_GRAPH);
+ if (pipelineJsonGraph == null) {
+ LOG.warn(
+ "Cannot build transformIOMap since Config: {} is found null ",
+ SamzaRunner.BEAM_JSON_GRAPH);
+ return result;
+ }
+ JsonObject jsonObject =
JsonParser.parseString(pipelineJsonGraph).getAsJsonObject();
+ JsonArray transformIOInfo = jsonObject.getAsJsonArray("transformIOInfo");
+ transformIOInfo.forEach(
+ transform -> {
+ final String transformName =
+ transform.getAsJsonObject().get("transformName").getAsString();
+ final String inputs =
transform.getAsJsonObject().get("inputs").getAsString();
Review Comment:
Can we directly get the inputs/outputs as list here? Serde json as list,
instead of manipulating the delimeter.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for
each transform. It
+ * maintains the average arrival time for each PCollection for a primitive
transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time
is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and
updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(BeamTransformMetricRegistry.class);
+
+ // TransformName -> PValue for pCollection -> Map<WatermarkId,
AvgArrivalTime>
+ private ConcurrentHashMap<String, ConcurrentHashMap<String,
ConcurrentHashMap<Long, Long>>>
+ avgArrivalTimeMap;
+
+ // Per Transform Metrics for each primitive transform
+ private final BeamTransformMetrics transformMetrics;
+
+ public BeamTransformMetricRegistry() {
+ this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+ this.transformMetrics = new BeamTransformMetrics();
+ }
+
+ public void register(String transformFullName, String pValue, Context ctx) {
+ transformMetrics.register(transformFullName, ctx);
+ // initialize the map for the transform
+ avgArrivalTimeMap.putIfAbsent(transformFullName, new
ConcurrentHashMap<>());
+ avgArrivalTimeMap.get(transformFullName).putIfAbsent(pValue, new
ConcurrentHashMap<>());
+ }
+
+ public BeamTransformMetrics getTransformMetrics() {
+ return transformMetrics;
+ }
+
+ public void updateArrivalTimeMap(String transformName, String pValue, long
watermark, long avg) {
+ if (avgArrivalTimeMap.get(transformName) != null
+ && avgArrivalTimeMap.get(transformName).get(pValue) != null) {
+ ConcurrentHashMap<Long, Long> avgArrivalTimeMapForPValue =
+ avgArrivalTimeMap.get(transformName).get(pValue);
+ // update the average arrival time for the latest watermark
+ avgArrivalTimeMapForPValue.put(watermark, avg);
+ // remove any stale entries which are lesser than the watermark
+ // todo: check is this safe to do here input metric op may be ahead in
watermark than output?
+ // why not do it at emission time?
+ avgArrivalTimeMapForPValue.entrySet().removeIf(entry -> entry.getKey() <
watermark);
+ }
+ }
+
+ // Checker framework bug:
https://github.com/typetools/checker-framework/issues/979
+ @SuppressWarnings("return")
+ public void emitLatencyMetric(
+ String transformName,
+ List<String> inputs,
+ List<String> outputs,
+ Long watermark,
+ String taskName) {
+ ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>>
avgArrivalTimeMapForTransform =
+ avgArrivalTimeMap.get(transformName);
+
+ if (avgArrivalTimeMapForTransform == null || inputs.isEmpty() ||
outputs.isEmpty()) {
+ return;
+ }
+
+ // get the avg arrival times for all the input PValues
+ List<Long> inputPValuesAvgArrivalTimes =
+ inputs.stream()
+ .map(avgArrivalTimeMapForTransform::get)
+ .map(map -> map == null ? null : map.remove(watermark))
+ .filter(avgArrivalTime -> avgArrivalTime != null)
+ .collect(Collectors.toList());
+
+ // get the avg arrival times for all the output PValues
+ List<Long> outputPValuesAvgArrivalTimes =
Review Comment:
final
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for
each transform. It
+ * maintains the average arrival time for each PCollection for a primitive
transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time
is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and
updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(BeamTransformMetricRegistry.class);
+
+ // TransformName -> PValue for pCollection -> Map<WatermarkId,
AvgArrivalTime>
+ private ConcurrentHashMap<String, ConcurrentHashMap<String,
ConcurrentHashMap<Long, Long>>>
+ avgArrivalTimeMap;
+
+ // Per Transform Metrics for each primitive transform
+ private final BeamTransformMetrics transformMetrics;
+
+ public BeamTransformMetricRegistry() {
+ this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+ this.transformMetrics = new BeamTransformMetrics();
+ }
+
+ public void register(String transformFullName, String pValue, Context ctx) {
+ transformMetrics.register(transformFullName, ctx);
+ // initialize the map for the transform
+ avgArrivalTimeMap.putIfAbsent(transformFullName, new
ConcurrentHashMap<>());
+ avgArrivalTimeMap.get(transformFullName).putIfAbsent(pValue, new
ConcurrentHashMap<>());
+ }
+
+ public BeamTransformMetrics getTransformMetrics() {
+ return transformMetrics;
+ }
+
+ public void updateArrivalTimeMap(String transformName, String pValue, long
watermark, long avg) {
+ if (avgArrivalTimeMap.get(transformName) != null
+ && avgArrivalTimeMap.get(transformName).get(pValue) != null) {
+ ConcurrentHashMap<Long, Long> avgArrivalTimeMapForPValue =
+ avgArrivalTimeMap.get(transformName).get(pValue);
+ // update the average arrival time for the latest watermark
+ avgArrivalTimeMapForPValue.put(watermark, avg);
+ // remove any stale entries which are lesser than the watermark
+ // todo: check is this safe to do here input metric op may be ahead in
watermark than output?
+ // why not do it at emission time?
+ avgArrivalTimeMapForPValue.entrySet().removeIf(entry -> entry.getKey() <
watermark);
+ }
+ }
+
+ // Checker framework bug:
https://github.com/typetools/checker-framework/issues/979
+ @SuppressWarnings("return")
+ public void emitLatencyMetric(
+ String transformName,
+ List<String> inputs,
+ List<String> outputs,
+ Long watermark,
+ String taskName) {
+ ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>>
avgArrivalTimeMapForTransform =
Review Comment:
final
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for
each transform. It
+ * maintains the average arrival time for each PCollection for a primitive
transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time
is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and
updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(BeamTransformMetricRegistry.class);
+
+ // TransformName -> PValue for pCollection -> Map<WatermarkId,
AvgArrivalTime>
+ private ConcurrentHashMap<String, ConcurrentHashMap<String,
ConcurrentHashMap<Long, Long>>>
+ avgArrivalTimeMap;
+
+ // Per Transform Metrics for each primitive transform
+ private final BeamTransformMetrics transformMetrics;
+
+ public BeamTransformMetricRegistry() {
+ this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+ this.transformMetrics = new BeamTransformMetrics();
+ }
+
+ public void register(String transformFullName, String pValue, Context ctx) {
+ transformMetrics.register(transformFullName, ctx);
+ // initialize the map for the transform
+ avgArrivalTimeMap.putIfAbsent(transformFullName, new
ConcurrentHashMap<>());
+ avgArrivalTimeMap.get(transformFullName).putIfAbsent(pValue, new
ConcurrentHashMap<>());
+ }
+
+ public BeamTransformMetrics getTransformMetrics() {
+ return transformMetrics;
+ }
+
+ public void updateArrivalTimeMap(String transformName, String pValue, long
watermark, long avg) {
+ if (avgArrivalTimeMap.get(transformName) != null
+ && avgArrivalTimeMap.get(transformName).get(pValue) != null) {
+ ConcurrentHashMap<Long, Long> avgArrivalTimeMapForPValue =
+ avgArrivalTimeMap.get(transformName).get(pValue);
+ // update the average arrival time for the latest watermark
+ avgArrivalTimeMapForPValue.put(watermark, avg);
+ // remove any stale entries which are lesser than the watermark
+ // todo: check is this safe to do here input metric op may be ahead in
watermark than output?
+ // why not do it at emission time?
+ avgArrivalTimeMapForPValue.entrySet().removeIf(entry -> entry.getKey() <
watermark);
+ }
+ }
+
+ // Checker framework bug:
https://github.com/typetools/checker-framework/issues/979
+ @SuppressWarnings("return")
+ public void emitLatencyMetric(
+ String transformName,
+ List<String> inputs,
+ List<String> outputs,
+ Long watermark,
+ String taskName) {
+ ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>>
avgArrivalTimeMapForTransform =
+ avgArrivalTimeMap.get(transformName);
+
+ if (avgArrivalTimeMapForTransform == null || inputs.isEmpty() ||
outputs.isEmpty()) {
+ return;
+ }
+
+ // get the avg arrival times for all the input PValues
+ List<Long> inputPValuesAvgArrivalTimes =
+ inputs.stream()
+ .map(avgArrivalTimeMapForTransform::get)
+ .map(map -> map == null ? null : map.remove(watermark))
+ .filter(avgArrivalTime -> avgArrivalTime != null)
+ .collect(Collectors.toList());
+
+ // get the avg arrival times for all the output PValues
+ List<Long> outputPValuesAvgArrivalTimes =
+ outputs.stream()
+ .map(avgArrivalTimeMapForTransform::get)
+ .map(map -> map == null ? null : map.remove(watermark))
+ .filter(avgArrivalTime -> avgArrivalTime != null)
+ .collect(Collectors.toList());
+
+ if (inputPValuesAvgArrivalTimes.isEmpty() ||
outputPValuesAvgArrivalTimes.isEmpty()) {
+ LOG.debug(
+ "Failure to Emit Metric for Transform: {} inputArrivalTime: {} or
outputArrivalTime: {} not found for Watermark: {} Task: {}",
+ transformName,
+ inputPValuesAvgArrivalTimes,
+ inputPValuesAvgArrivalTimes,
+ watermark,
+ taskName);
+ return;
+ }
+
+ long startTime = Collections.min(inputPValuesAvgArrivalTimes);
Review Comment:
final for all vars below.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for
each transform. It
+ * maintains the average arrival time for each PCollection for a primitive
transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time
is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and
updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(BeamTransformMetricRegistry.class);
+
+ // TransformName -> PValue for pCollection -> Map<WatermarkId,
AvgArrivalTime>
+ private ConcurrentHashMap<String, ConcurrentHashMap<String,
ConcurrentHashMap<Long, Long>>>
+ avgArrivalTimeMap;
+
+ // Per Transform Metrics for each primitive transform
+ private final BeamTransformMetrics transformMetrics;
+
+ public BeamTransformMetricRegistry() {
+ this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+ this.transformMetrics = new BeamTransformMetrics();
+ }
+
+ public void register(String transformFullName, String pValue, Context ctx) {
+ transformMetrics.register(transformFullName, ctx);
+ // initialize the map for the transform
+ avgArrivalTimeMap.putIfAbsent(transformFullName, new
ConcurrentHashMap<>());
+ avgArrivalTimeMap.get(transformFullName).putIfAbsent(pValue, new
ConcurrentHashMap<>());
+ }
+
+ public BeamTransformMetrics getTransformMetrics() {
+ return transformMetrics;
+ }
+
+ public void updateArrivalTimeMap(String transformName, String pValue, long
watermark, long avg) {
+ if (avgArrivalTimeMap.get(transformName) != null
+ && avgArrivalTimeMap.get(transformName).get(pValue) != null) {
+ ConcurrentHashMap<Long, Long> avgArrivalTimeMapForPValue =
+ avgArrivalTimeMap.get(transformName).get(pValue);
+ // update the average arrival time for the latest watermark
+ avgArrivalTimeMapForPValue.put(watermark, avg);
+ // remove any stale entries which are lesser than the watermark
+ // todo: check is this safe to do here input metric op may be ahead in
watermark than output?
+ // why not do it at emission time?
+ avgArrivalTimeMapForPValue.entrySet().removeIf(entry -> entry.getKey() <
watermark);
+ }
+ }
+
+ // Checker framework bug:
https://github.com/typetools/checker-framework/issues/979
+ @SuppressWarnings("return")
+ public void emitLatencyMetric(
+ String transformName,
+ List<String> inputs,
+ List<String> outputs,
+ Long watermark,
+ String taskName) {
+ ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>>
avgArrivalTimeMapForTransform =
+ avgArrivalTimeMap.get(transformName);
+
+ if (avgArrivalTimeMapForTransform == null || inputs.isEmpty() ||
outputs.isEmpty()) {
+ return;
+ }
+
+ // get the avg arrival times for all the input PValues
+ List<Long> inputPValuesAvgArrivalTimes =
Review Comment:
final
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/TestSamzaRunner.java:
##########
@@ -91,4 +105,27 @@ public PipelineResult run(Pipeline pipeline) {
throw t;
}
}
+
+ public static class InMemoryMetricsReporter implements MetricsReporter {
Review Comment:
MOve this out to be in util or something. Add java doc for every class.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricOp.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.samza.metrics.BeamTransformMetricRegistry;
+import org.apache.beam.runners.samza.util.PipelineJsonRenderer;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
+
+/**
+ * MetricOp for default throughput, latency & watermark progress metric per
transform for Beam Samza
+ * Runner. A MetricOp can be either attached to Input PCollection or Output
PCollection of a
+ * PTransform.
+ *
+ * <p>A MetricOp is created per primitive PTransform per PCollection its
across its inputs &
+ * outputs. 1. An independent MetricOp is created and attached to each input
PCollection to the
+ * PTransform. 2. An independent MetricOp is created and attached to each
input PCollection to the
+ * PTransform.
+ *
+ * <p>Each concrete MetricOp is responsible for following metrics computation:
1. Throughput: Emit
+ * the number of elements processed in the PCollection 2. Watermark Progress:
Emit the watermark
+ * progress of the PCollection 3. Latency: Maintain the avg arrival time per
watermark across
+ * elements it processes, compute & emit the latency
+ *
+ * @param <T> type of the message
+ */
+public abstract class SamzaMetricOp<T> implements Op<T, T, Void> {
+ // Unique name of the PTransform this MetricOp is associated with
+ protected final String transformFullName;
+ protected final BeamTransformMetricRegistry beamTransformMetricRegistry;
+ // Name or identifier of the PCollection which Ptrasform is processing
+ protected final String pValue;
+ // List of input PValue(s) for all PCollections processing the PTransform
+ protected List<String> transformInputs;
Review Comment:
transient
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetrics.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.SlidingTimeWindowReservoir;
+import org.apache.samza.metrics.Timer;
+
+/**
+ * Metrics like throughput, latency and watermark progress for each Beam
transform for Samza Runner.
+ */
+@SuppressWarnings("return")
+public class BeamTransformMetrics implements Serializable {
Review Comment:
s/Beam/Samza
same question: seems this class has a few concurrenthashmap, are those
seriazable?
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaInputMetricOp.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.samza.metrics.BeamTransformMetricRegistry;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SamzaInputMetricOp is a {@link SamzaMetricOp} that emits & maintains
default transform metrics
+ * for input PCollection to the transform. It emits the input throughput and
maintains avg arrival
+ * time for input PCollection per watermark.
+ *
+ * <p>Assumes that {@code SamzaInputMetricOp#processWatermark(Instant,
OpEmitter)} is exclusive of
+ * {@code SamzaInputMetricOp#processElement(Instant, OpEmitter)}.
Specifically, the processWatermark
+ * method assumes that no calls to processElement will be made during its
execution, and vice versa.
+ *
+ * @param <T> The type of the elements in the input PCollection.
+ */
+public class SamzaInputMetricOp<T> extends SamzaMetricOp<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(SamzaInputMetricOp.class);
+ // Counters to maintain avg arrival time per watermark for input PCollection.
+ private AtomicLong count;
+ private AtomicReference<BigInteger> sumOfTimestamps;
Review Comment:
final
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaInputMetricOp.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.samza.metrics.BeamTransformMetricRegistry;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SamzaInputMetricOp is a {@link SamzaMetricOp} that emits & maintains
default transform metrics
+ * for input PCollection to the transform. It emits the input throughput and
maintains avg arrival
+ * time for input PCollection per watermark.
+ *
+ * <p>Assumes that {@code SamzaInputMetricOp#processWatermark(Instant,
OpEmitter)} is exclusive of
+ * {@code SamzaInputMetricOp#processElement(Instant, OpEmitter)}.
Specifically, the processWatermark
+ * method assumes that no calls to processElement will be made during its
execution, and vice versa.
+ *
+ * @param <T> The type of the elements in the input PCollection.
+ */
+public class SamzaInputMetricOp<T> extends SamzaMetricOp<T> {
Review Comment:
MOve this class and other related metrics ops to metrics package.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricOp.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.samza.metrics.BeamTransformMetricRegistry;
+import org.apache.beam.runners.samza.util.PipelineJsonRenderer;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
+
+/**
+ * MetricOp for default throughput, latency & watermark progress metric per
transform for Beam Samza
+ * Runner. A MetricOp can be either attached to Input PCollection or Output
PCollection of a
+ * PTransform.
+ *
+ * <p>A MetricOp is created per primitive PTransform per PCollection its
across its inputs &
+ * outputs. 1. An independent MetricOp is created and attached to each input
PCollection to the
+ * PTransform. 2. An independent MetricOp is created and attached to each
input PCollection to the
+ * PTransform.
+ *
+ * <p>Each concrete MetricOp is responsible for following metrics computation:
1. Throughput: Emit
+ * the number of elements processed in the PCollection 2. Watermark Progress:
Emit the watermark
+ * progress of the PCollection 3. Latency: Maintain the avg arrival time per
watermark across
+ * elements it processes, compute & emit the latency
+ *
+ * @param <T> type of the message
+ */
+public abstract class SamzaMetricOp<T> implements Op<T, T, Void> {
+ // Unique name of the PTransform this MetricOp is associated with
+ protected final String transformFullName;
+ protected final BeamTransformMetricRegistry beamTransformMetricRegistry;
+ // Name or identifier of the PCollection which Ptrasform is processing
+ protected final String pValue;
+ // List of input PValue(s) for all PCollections processing the PTransform
+ protected List<String> transformInputs;
+ // List of output PValue(s) for all PCollections processing the PTransform
+ protected List<String> transformOutputs;
Review Comment:
transient
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for
each transform. It
+ * maintains the average arrival time for each PCollection for a primitive
transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time
is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and
updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {
Review Comment:
s/Beam/Samza
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -145,9 +152,25 @@ protected <KeyT, OutT> void registerInputMessageStreams(
}
public <OutT> void registerMessageStream(PValue pvalue,
MessageStream<OpMessage<OutT>> stream) {
+ registerMessageStream(pvalue, stream, true);
+ }
+
+ public <OutT> void registerMessageStream(
+ PValue pvalue, MessageStream<OpMessage<OutT>> stream, boolean
enableTransformMetric) {
Review Comment:
get rid of this boolean flag.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java:
##########
@@ -104,7 +104,7 @@ private static <K, InputT, OutputT> void doTranslate(
outputTag,
input.isBounded());
- ctx.registerMessageStream(output, outputStream);
+ ctx.registerMessageStream(output, outputStream, false);
Review Comment:
Why we need to pass false here? We need to compute the transform metrics for
GBK too, right?
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for
each transform. It
+ * maintains the average arrival time for each PCollection for a primitive
transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time
is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and
updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {
Review Comment:
Why this needs to be serializable? I don't know whether this
ConcurrentHashMap is serializable
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricOp.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.samza.metrics.BeamTransformMetricRegistry;
+import org.apache.beam.runners.samza.util.PipelineJsonRenderer;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
+
+/**
+ * MetricOp for default throughput, latency & watermark progress metric per
transform for Beam Samza
+ * Runner. A MetricOp can be either attached to Input PCollection or Output
PCollection of a
+ * PTransform.
+ *
+ * <p>A MetricOp is created per primitive PTransform per PCollection its
across its inputs &
+ * outputs. 1. An independent MetricOp is created and attached to each input
PCollection to the
+ * PTransform. 2. An independent MetricOp is created and attached to each
input PCollection to the
+ * PTransform.
+ *
+ * <p>Each concrete MetricOp is responsible for following metrics computation:
1. Throughput: Emit
+ * the number of elements processed in the PCollection 2. Watermark Progress:
Emit the watermark
+ * progress of the PCollection 3. Latency: Maintain the avg arrival time per
watermark across
+ * elements it processes, compute & emit the latency
+ *
+ * @param <T> type of the message
+ */
+public abstract class SamzaMetricOp<T> implements Op<T, T, Void> {
+ // Unique name of the PTransform this MetricOp is associated with
+ protected final String transformFullName;
+ protected final BeamTransformMetricRegistry beamTransformMetricRegistry;
+ // Name or identifier of the PCollection which Ptrasform is processing
+ protected final String pValue;
+ // List of input PValue(s) for all PCollections processing the PTransform
+ protected List<String> transformInputs;
+ // List of output PValue(s) for all PCollections processing the PTransform
+ protected List<String> transformOutputs;
+ // Name of the task, for logging purpose
+ protected String task;
Review Comment:
transient
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -159,12 +182,29 @@ public MessageStream<OpMessage<String>> getDummyStream() {
}
public <OutT> MessageStream<OpMessage<OutT>> getMessageStream(PValue pvalue)
{
+ return getMessageStream(pvalue, true);
+ }
+
+ public <OutT> MessageStream<OpMessage<OutT>> getMessageStream(
+ PValue pvalue, boolean enableTransformMetric) {
Review Comment:
get rid of this flag.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -283,4 +323,13 @@ public StoreIdGenerator getStoreIdGenerator() {
sendFn.accept(new EndOfStreamMessage(null));
return dummyInput;
}
+
+ boolean shouldDoAttachMetricOp(Config config, boolean enableTransformMetric)
{
Review Comment:
remove this.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -159,12 +182,29 @@ public MessageStream<OpMessage<String>> getDummyStream() {
}
public <OutT> MessageStream<OpMessage<OutT>> getMessageStream(PValue pvalue)
{
+ return getMessageStream(pvalue, true);
+ }
+
+ public <OutT> MessageStream<OpMessage<OutT>> getMessageStream(
+ PValue pvalue, boolean enableTransformMetric) {
@SuppressWarnings("unchecked")
final MessageStream<OpMessage<OutT>> stream =
(MessageStream<OpMessage<OutT>>) messsageStreams.get(pvalue);
if (stream == null) {
throw new IllegalArgumentException("No stream registered for pvalue: " +
pvalue);
}
+
+ // add a step to attach InputMetricOp if registered for Op Stream
+ final Config overrideConfig = new
MapConfig(getPipelineOptions().getConfigOverride());
Review Comment:
Use pipeline options instead of samza config.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaInputMetricOp.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.samza.metrics.BeamTransformMetricRegistry;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SamzaInputMetricOp is a {@link SamzaMetricOp} that emits & maintains
default transform metrics
+ * for input PCollection to the transform. It emits the input throughput and
maintains avg arrival
+ * time for input PCollection per watermark.
+ *
+ * <p>Assumes that {@code SamzaInputMetricOp#processWatermark(Instant,
OpEmitter)} is exclusive of
+ * {@code SamzaInputMetricOp#processElement(Instant, OpEmitter)}.
Specifically, the processWatermark
+ * method assumes that no calls to processElement will be made during its
execution, and vice versa.
+ *
+ * @param <T> The type of the elements in the input PCollection.
+ */
+public class SamzaInputMetricOp<T> extends SamzaMetricOp<T> {
Review Comment:
This class looks almost the same as the OutputMetricsOP class. I don't
understand why we need three classes here (input, output, abstract). Let's
consolidate them into one. You can tell whether it's an input or output by
checking the input/output collection size.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]