johnjcasey commented on code in PR #29098:
URL: https://github.com/apache/beam/pull/29098#discussion_r1370294823
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java:
##########
@@ -181,6 +193,13 @@ public void process(PipelineOptions pipelineOptions,
@Element KV<String, Operati
},
// onSuccess
c -> {
+ BigQuerySinkMetrics.FlushRowsCounter("ok").inc();
Review Comment:
I'd prefer to not have a magic string here
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java:
##########
@@ -126,6 +140,12 @@ public void process(PipelineOptions pipelineOptions,
@Element KV<String, String>
rowsFinalized.inc(response.getRowCount());
finalizeOperationsSucceeded.inc();
+ BigQuerySinkMetrics.FinalizeStreamCounter("ok").inc();
+ @Nullable Instant operationStartTime = c.getOperationStartTime();
+ Instant operationEndTime = Instant.now();
+ BigQuerySinkMetrics.UpdateRpcLatencyMetric(
Review Comment:
Is updating the metric here accurate for failed operations?
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerCounter.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Internal;
+
+/** Implementation of {@link Counter} that delegates to the instance for the
current context. */
+@Internal
+public class DelegatingPerWorkerCounter implements Metric, Counter,
Serializable {
+ private final MetricName name;
+ private final boolean processWideContainer;
+
+ public DelegatingPerWorkerCounter(MetricName name) {
+ this(name, false);
+ }
+
+ public DelegatingPerWorkerCounter(MetricName name, boolean
processWideContainer) {
+ this.name = name;
+ this.processWideContainer = processWideContainer;
+ }
+
+ /** Increment the counter. */
+ @Override
+ public void inc() {
+ inc(1);
+ }
+
+ /** Increment the counter by the given amount. */
+ @Override
+ public void inc(long n) {
+ MetricsContainer container =
+ this.processWideContainer
+ ? MetricsEnvironment.getProcessWideContainer()
+ : MetricsEnvironment.getCurrentContainer();
+ if (container != null) {
+ container.getPerWorkerCounter(name).inc(n);
Review Comment:
Do we need a separate implementation of Delegating counter just for this
line change?
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerHistogram.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.HistogramData;
+
+/** Implementation of {@link Histogram} that delegates to the instance for the
current context. */
+@Internal
+public class DelegatingPerWorkerHistogram implements Metric, Histogram,
Serializable {
+ private final MetricName name;
+ private final HistogramData.BucketType bucketType;
+ private final boolean processWideContainer;
+
+ public DelegatingPerWorkerHistogram(
+ MetricName name, HistogramData.BucketType bucketType, boolean
processWideContainer) {
+ this.name = name;
+ this.bucketType = bucketType;
+ this.processWideContainer = processWideContainer;
+ }
+
+ public DelegatingPerWorkerHistogram(MetricName name,
HistogramData.BucketType bucketType) {
+ this(name, bucketType, false);
+ }
+
+ @Override
+ public void update(double value) {
+ MetricsContainer container =
+ processWideContainer
+ ? MetricsEnvironment.getProcessWideContainer()
+ : MetricsEnvironment.getCurrentContainer();
+ if (container != null) {
+ container.getPerWorkerHistogram(name, bucketType).update(value);
Review Comment:
same idea here
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -643,11 +650,20 @@ long flush(
contexts -> {
AppendRowsContext failedContext =
Preconditions.checkStateNotNull(Iterables.getFirst(contexts,
null));
+ Instant operationEndTime = Instant.now();
Review Comment:
We have this pattern all over the place, of grabbing the start time,
updating a metric, grabbing error code, etc.
Is it possible to repackage this into a reusable utility method for clarity?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]