This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7159beac55f Minor updates to StreamingInsertsMetrics (#31003)
7159beac55f is described below
commit 7159beac55f6b3492bdf84df956cffe4df27fca9
Author: JayajP <[email protected]>
AuthorDate: Wed Apr 17 09:28:53 2024 -0700
Minor updates to StreamingInsertsMetrics (#31003)
---
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 12 +++--
.../io/gcp/bigquery/StreamingInsertsMetrics.java | 63 ++++++----------------
.../gcp/bigquery/StreamingInsertsMetricsTest.java | 13 +++--
3 files changed, 30 insertions(+), 58 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 2dbc02131f5..fa680004a7c 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -1079,6 +1079,7 @@ public class BigQueryServicesImpl implements
BigQueryServices {
}
StreamingInsertsMetrics streamingInsertsResults =
BigQuerySinkMetrics.streamingInsertsMetrics();
+ int numFailedRows = 0;
final Set<Integer> failedIndices = new HashSet<>();
long retTotalDataSize = 0;
List<TableDataInsertAllResponse.InsertErrors> allErrors = new
ArrayList<>();
@@ -1135,7 +1136,7 @@ public class BigQueryServicesImpl implements
BigQueryServices {
+ " pipeline, and the row will be output as a failed
insert.",
nextRowSize));
} else {
- streamingInsertsResults.incrementFailedRows();
+ numFailedRows += 1;
errorContainer.add(failedInserts, error, ref,
rowsToPublish.get(rowIndex));
failedIndices.add(rowIndex);
rowIndex++;
@@ -1223,7 +1224,7 @@ public class BigQueryServicesImpl implements
BigQueryServices {
retryIds.add(idsToPublish.get(errorIndex));
}
} else {
- streamingInsertsResults.incrementFailedRows();
+ numFailedRows += 1;
errorContainer.add(failedInserts, error, ref,
rowsToPublish.get(errorIndex));
}
}
@@ -1234,7 +1235,8 @@ public class BigQueryServicesImpl implements
BigQueryServices {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while inserting " +
rowsToPublish);
} catch (ExecutionException e) {
- streamingInsertsResults.updateStreamingInsertsMetrics(ref);
+ streamingInsertsResults.updateStreamingInsertsMetrics(
+ ref, rowList.size(), rowList.size());
throw new RuntimeException(e.getCause());
}
@@ -1276,8 +1278,8 @@ public class BigQueryServicesImpl implements
BigQueryServices {
}
}
}
- streamingInsertsResults.updateSuccessfulAndFailedRows(rowList.size(),
allErrors.size());
- streamingInsertsResults.updateStreamingInsertsMetrics(ref);
+ numFailedRows += allErrors.size();
+ streamingInsertsResults.updateStreamingInsertsMetrics(ref,
rowList.size(), numFailedRows);
if (!allErrors.isEmpty()) {
throw new IOException("Insert failed: " + allErrors);
} else {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetrics.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetrics.java
index c451856f06a..2ada2e94dc1 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetrics.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetrics.java
@@ -40,11 +40,8 @@ public interface StreamingInsertsMetrics {
void updateSuccessfulRpcMetrics(Instant start, Instant end);
- void incrementFailedRows();
-
- void updateSuccessfulAndFailedRows(int totalRows, int failedRows);
-
- void updateStreamingInsertsMetrics(@Nullable TableReference tableRef);
+ void updateStreamingInsertsMetrics(
+ @Nullable TableReference tableRef, int totalRows, int failedRows);
/** No-op implementation of {@code StreamingInsertsResults}. */
class NoOpStreamingInsertsMetrics implements StreamingInsertsMetrics {
@@ -60,13 +57,8 @@ public interface StreamingInsertsMetrics {
public void updateSuccessfulRpcMetrics(Instant start, Instant end) {}
@Override
- public void incrementFailedRows() {}
-
- @Override
- public void updateSuccessfulAndFailedRows(int totalRows, int failedRows) {}
-
- @Override
- public void updateStreamingInsertsMetrics(@Nullable TableReference
tableRef) {}
+ public void updateStreamingInsertsMetrics(
+ @Nullable TableReference tableRef, int totalRows, int failedRows) {}
private static NoOpStreamingInsertsMetrics singleton = new
NoOpStreamingInsertsMetrics();
@@ -87,6 +79,10 @@ public interface StreamingInsertsMetrics {
*/
@AutoValue
abstract class StreamingInsertsMetricsImpl implements
StreamingInsertsMetrics {
+ static final Histogram LATENCY_HISTOGRAM =
+ BigQuerySinkMetrics.createRPCLatencyHistogram(
+ BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS);
+
abstract ConcurrentLinkedQueue<java.time.Duration> rpcLatencies();
abstract ConcurrentLinkedQueue<String> rpcErrorStatus();
@@ -96,10 +92,6 @@ public interface StreamingInsertsMetrics {
abstract AtomicInteger successfulRpcsCount();
- abstract AtomicInteger successfulRowsCount();
-
- abstract AtomicInteger failedRowsCount();
-
abstract AtomicBoolean isWritable();
public static StreamingInsertsMetricsImpl create() {
@@ -108,8 +100,6 @@ public interface StreamingInsertsMetrics {
new ConcurrentLinkedQueue<>(),
new ConcurrentLinkedQueue<>(),
new AtomicInteger(),
- new AtomicInteger(),
- new AtomicInteger(),
new AtomicBoolean(true));
}
@@ -139,31 +129,11 @@ public interface StreamingInsertsMetrics {
}
}
- /** Increment the failed rows count by one. */
- @Override
- public void incrementFailedRows() {
- if (isWritable().get()) {
- failedRowsCount().getAndIncrement();
- }
- }
-
- /** Increment the failed rows count, and set the successful rows count. */
- @Override
- public void updateSuccessfulAndFailedRows(int totalRows, int failedRows) {
- if (isWritable().get()) {
- failedRowsCount().getAndAdd(failedRows);
- successfulRowsCount().set(totalRows - failedRowsCount().get());
- }
- }
-
/** Record rpc latency histogram metrics. */
private void recordRpcLatencyMetrics() {
- Histogram latencyHistogram =
- BigQuerySinkMetrics.createRPCLatencyHistogram(
- BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS);
- double[] rpcLatencies =
- rpcLatencies().stream().mapToDouble(duration ->
duration.toMillis()).toArray();
- latencyHistogram.update(rpcLatencies);
+ for (Duration d : rpcLatencies()) {
+ LATENCY_HISTOGRAM.update(d.toMillis());
+ }
}
/**
@@ -174,7 +144,8 @@ public interface StreamingInsertsMetrics {
* @param tableRef BigQuery table that was written to, return early if
null.
*/
@Override
- public void updateStreamingInsertsMetrics(@Nullable TableReference
tableRef) {
+ public void updateStreamingInsertsMetrics(
+ @Nullable TableReference tableRef, int totalRows, int failedRows) {
if (!isWritable().compareAndSet(true, false)) {
// Metrics have already been exported.
return;
@@ -221,16 +192,16 @@ public interface StreamingInsertsMetrics {
.inc(successfulRpcsCount().longValue());
}
- if (failedRowsCount().get() != 0) {
+ if (failedRows >= 0) {
BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.FAILED,
BigQuerySinkMetrics.INTERNAL, shortTableId)
- .inc(failedRowsCount().longValue());
+ .inc(failedRows);
}
- if (successfulRowsCount().get() != 0) {
+ if (totalRows - failedRows >= 0) {
BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.SUCCESSFUL,
BigQuerySinkMetrics.OK, shortTableId)
- .inc(successfulRowsCount().longValue());
+ .inc(totalRows - failedRows);
}
recordRpcLatencyMetrics();
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
index 274b6161099..4ba66814419 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
@@ -44,7 +44,7 @@ public class StreamingInsertsMetricsTest {
results.updateSuccessfulRpcMetrics(t1, t1.plus(Duration.ofMillis(10)));
TableReference ref = new
TableReference().setTableId("t").setDatasetId("d");
- results.updateStreamingInsertsMetrics(ref);
+ results.updateStreamingInsertsMetrics(ref, 5, 0);
assertThat(testContainer.perWorkerCounters.size(), equalTo(0));
assertThat(testContainer.perWorkerHistograms.size(), equalTo(0));
@@ -62,7 +62,7 @@ public class StreamingInsertsMetricsTest {
Instant t1 = Instant.now();
results.updateSuccessfulRpcMetrics(t1, t1.plus(Duration.ofMillis(10)));
- results.updateStreamingInsertsMetrics(null);
+ results.updateStreamingInsertsMetrics(null, 0, 0);
assertThat(testContainer.perWorkerCounters.size(), equalTo(0));
assertThat(testContainer.perWorkerHistograms.size(), equalTo(0));
@@ -88,12 +88,11 @@ public class StreamingInsertsMetricsTest {
StreamingInsertsMetrics results =
StreamingInsertsMetrics.StreamingInsertsMetricsImpl.create();
results.updateRetriedRowsWithStatus("INTERNAL", 10);
- results.updateSuccessfulAndFailedRows(50, 30);
results.updateRetriedRowsWithStatus("QuotaLimits", 10);
results.updateRetriedRowsWithStatus("QuotaLimits", 5);
results.updateRetriedRowsWithStatus("ServiceUnavailable", 5);
- results.updateStreamingInsertsMetrics(ref);
+ results.updateStreamingInsertsMetrics(ref, 50, 30);
String tableId = "datasets/d/tables/t";
MetricName internalErrorRetriedMetricName =
@@ -129,7 +128,7 @@ public class StreamingInsertsMetricsTest {
results.updateFailedRpcMetrics(t1, t1.plus(Duration.ofMillis(30)),
"PermissionDenied");
results.updateFailedRpcMetrics(t1, t1.plus(Duration.ofMillis(40)),
"Unavailable");
- results.updateStreamingInsertsMetrics(ref);
+ results.updateStreamingInsertsMetrics(ref, 5, 0);
// Validate RPC latency metric.
MetricName histogramName =
@@ -162,7 +161,7 @@ public class StreamingInsertsMetricsTest {
StreamingInsertsMetrics results =
StreamingInsertsMetrics.StreamingInsertsMetricsImpl.create();
results.updateRetriedRowsWithStatus("INTERNAL", 10);
- results.updateStreamingInsertsMetrics(ref);
+ results.updateStreamingInsertsMetrics(ref, 5, 0);
String tableId = "datasets/d/tables/t";
MetricName internalErrorRetriedMetricName =
@@ -172,7 +171,7 @@ public class StreamingInsertsMetricsTest {
// Subsequent updates to this object should update the underyling metrics.
results.updateRetriedRowsWithStatus("INTERNAL", 10);
- results.updateStreamingInsertsMetrics(ref);
+ results.updateStreamingInsertsMetrics(ref, 5, 0);
testContainer.assertPerWorkerCounterValue(internalErrorRetriedMetricName,
10L);
}