This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1e8a16d9eae Use base 2 exponential histograms in BigQuerySinkMetrics
(#31000)
1e8a16d9eae is described below
commit 1e8a16d9eae3bfd8c42ea102c0ed2d7de5cbe060
Author: JayajP <[email protected]>
AuthorDate: Wed Apr 17 09:26:20 2024 -0700
Use base 2 exponential histograms in BigQuerySinkMetrics (#31000)
* Use base 2 exponential histograms in BigQuerySinkMetrics
---
.../sdk/io/gcp/bigquery/BigQuerySinkMetrics.java | 7 +-
.../io/gcp/bigquery/BigQuerySinkMetricsTest.java | 76 +++++-----------------
.../gcp/bigquery/StreamingInsertsMetricsTest.java | 2 +-
3 files changed, 24 insertions(+), 61 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
index b80438aceeb..ecdf8eee4bf 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
@@ -106,7 +106,7 @@ public class BigQuerySinkMetrics {
* <p>'RpcLatency-Method:{method};'
*
* @param method StorageWriteAPI method associated with this metric.
- * @return Histogram with exponential buckets with a sqrt(2) growth factor.
+ * @return Histogram with exponential buckets with a size 2 growth factor.
*/
static Histogram createRPCLatencyHistogram(RpcMethod method) {
LabeledMetricNameUtils.MetricNameBuilder nameBuilder =
@@ -114,7 +114,10 @@ public class BigQuerySinkMetrics {
nameBuilder.addLabel(RPC_METHOD, method.toString());
MetricName metricName = nameBuilder.build(METRICS_NAMESPACE);
- HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1,
34);
+ // Create Exponential histogram buckets wtih the following parameters:
+ // 0 scale, resulting in bucket widths with a size 2 growth factor.
+ // 17 buckets, so the max latency of that can be stored is (2^17 millis ~=
130 seconds).
+ HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(0,
17);
return new DelegatingHistogram(metricName, buckets, false, true);
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
index 37472c4db3d..5254ab514db 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
@@ -128,10 +128,7 @@ public class BigQuerySinkMetricsTest {
MetricName deletesDisabledCounterName =
MetricName.named(
"BigQuerySink",
"RowsAppendedCount*row_status:SUCCESSFUL;rpc_status:rpcStatus;");
- assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(deletesDisabledCounterName));
- assertThat(
-
testContainer.perWorkerCounters.get(deletesDisabledCounterName).getCumulative(),
- equalTo(1L));
+ testContainer.assertPerWorkerCounterValue(deletesDisabledCounterName, 1L);
BigQuerySinkMetrics.setSupportMetricsDeletion(true);
testContainer.reset();
@@ -143,10 +140,7 @@ public class BigQuerySinkMetricsTest {
MetricName.named(
"BigQuerySink",
"RowsAppendedCount*row_status:SUCCESSFUL;rpc_status:rpcStatus;table_id:tableId;");
- assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(deletesEnabledCounterName));
- assertThat(
-
testContainer.perWorkerCounters.get(deletesEnabledCounterName).getCumulative(),
- equalTo(1L));
+ testContainer.assertPerWorkerCounterValue(deletesEnabledCounterName, 1L);
}
@Test
@@ -179,8 +173,7 @@ public class BigQuerySinkMetricsTest {
MetricName counterName =
MetricName.named("BigQuerySink",
"ThrottledTime*rpc_method:APPEND_ROWS;");
- assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(counterName));
-
assertThat(testContainer.perWorkerCounters.get(counterName).getCumulative(),
equalTo(1L));
+ testContainer.assertPerWorkerCounterValue(counterName, 1L);
}
@Test
@@ -201,14 +194,9 @@ public class BigQuerySinkMetricsTest {
MetricName.named("BigQuerySink",
"RpcRequestsCount*rpc_method:APPEND_ROWS;rpc_status:OK;");
MetricName histogramName =
MetricName.named("BigQuerySink", "RpcLatency*rpc_method:APPEND_ROWS;");
- HistogramData.BucketType bucketType =
HistogramData.ExponentialBuckets.of(1, 34);
- assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(counterNameDisabledDeletes));
- assertThat(
-
testContainer.perWorkerCounters.get(counterNameDisabledDeletes).getCumulative(),
- equalTo(1L));
- assertThat(
- testContainer.perWorkerHistograms.get(KV.of(histogramName,
bucketType)).values,
- containsInAnyOrder(Double.valueOf(3.0)));
+ HistogramData.BucketType bucketType =
HistogramData.ExponentialBuckets.of(0, 17);
+ testContainer.assertPerWorkerCounterValue(counterNameDisabledDeletes, 1L);
+ testContainer.assertPerWorkerHistogramValues(histogramName, bucketType,
3.0);
// Test enable SupportMetricsDeletion.
BigQuerySinkMetrics.setSupportMetricsDeletion(true);
@@ -219,13 +207,8 @@ public class BigQuerySinkMetricsTest {
MetricName.named(
"BigQuerySink",
"RpcRequestsCount*rpc_method:APPEND_ROWS;rpc_status:OK;table_id:tableId;");
- assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(counterNameEnabledDeletes));
- assertThat(
-
testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(),
- equalTo(1L));
- assertThat(
- testContainer.perWorkerHistograms.get(KV.of(histogramName,
bucketType)).values,
- containsInAnyOrder(Double.valueOf(3.0)));
+ testContainer.assertPerWorkerCounterValue(counterNameEnabledDeletes, 1L);
+ testContainer.assertPerWorkerHistogramValues(histogramName, bucketType,
3.0);
}
@Test
@@ -251,17 +234,9 @@ public class BigQuerySinkMetricsTest {
"BigQuerySink",
"RpcRequestsCount*rpc_method:APPEND_ROWS;rpc_status:NOT_FOUND;");
MetricName histogramName =
MetricName.named("BigQuerySink", "RpcLatency*rpc_method:APPEND_ROWS;");
- HistogramData.BucketType bucketType =
HistogramData.ExponentialBuckets.of(1, 34);
- assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(counterNameDisabledDeletes));
- assertThat(
-
testContainer.perWorkerCounters.get(counterNameDisabledDeletes).getCumulative(),
- equalTo(1L));
- assertThat(
- testContainer.perWorkerHistograms,
- IsMapContaining.hasKey(KV.of(histogramName, bucketType)));
- assertThat(
- testContainer.perWorkerHistograms.get(KV.of(histogramName,
bucketType)).values,
- containsInAnyOrder(Double.valueOf(5.0)));
+ HistogramData.BucketType bucketType =
HistogramData.ExponentialBuckets.of(0, 17);
+ testContainer.assertPerWorkerCounterValue(counterNameDisabledDeletes, 1L);
+ testContainer.assertPerWorkerHistogramValues(histogramName, bucketType,
5.0);
// Test enable SupportMetricsDeletion
BigQuerySinkMetrics.setSupportMetricsDeletion(true);
@@ -272,13 +247,8 @@ public class BigQuerySinkMetricsTest {
MetricName.named(
"BigQuerySink",
"RpcRequestsCount*rpc_method:APPEND_ROWS;rpc_status:NOT_FOUND;table_id:tableId;");
- assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(counterNameEnabledDeletes));
- assertThat(
-
testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(),
- equalTo(1L));
- assertThat(
- testContainer.perWorkerHistograms.get(KV.of(histogramName,
bucketType)).values,
- containsInAnyOrder(Double.valueOf(5.0)));
+ testContainer.assertPerWorkerCounterValue(counterNameEnabledDeletes, 1L);
+ testContainer.assertPerWorkerHistogramValues(histogramName, bucketType,
5.0);
}
@Test
@@ -303,14 +273,9 @@ public class BigQuerySinkMetricsTest {
"BigQuerySink",
"RpcRequestsCount*rpc_method:APPEND_ROWS;rpc_status:UNKNOWN;");
MetricName histogramName =
MetricName.named("BigQuerySink", "RpcLatency*rpc_method:APPEND_ROWS;");
- HistogramData.BucketType bucketType =
HistogramData.ExponentialBuckets.of(1, 34);
- assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(counterNameDisabledDeletes));
- assertThat(
-
testContainer.perWorkerCounters.get(counterNameDisabledDeletes).getCumulative(),
- equalTo(1L));
- assertThat(
- testContainer.perWorkerHistograms.get(KV.of(histogramName,
bucketType)).values,
- containsInAnyOrder(Double.valueOf(15.0)));
+ HistogramData.BucketType bucketType =
HistogramData.ExponentialBuckets.of(0, 17);
+ testContainer.assertPerWorkerCounterValue(counterNameDisabledDeletes, 1L);
+ testContainer.assertPerWorkerHistogramValues(histogramName, bucketType,
15.0);
// Test enable SupportMetricsDeletion
BigQuerySinkMetrics.setSupportMetricsDeletion(true);
@@ -321,13 +286,8 @@ public class BigQuerySinkMetricsTest {
MetricName.named(
"BigQuerySink",
"RpcRequestsCount*rpc_method:APPEND_ROWS;rpc_status:UNKNOWN;table_id:tableId;");
- assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(counterNameEnabledDeletes));
- assertThat(
-
testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(),
- equalTo(1L));
- assertThat(
- testContainer.perWorkerHistograms.get(KV.of(histogramName,
bucketType)).values,
- containsInAnyOrder(Double.valueOf(15.0)));
+ testContainer.assertPerWorkerCounterValue(counterNameEnabledDeletes, 1L);
+ testContainer.assertPerWorkerHistogramValues(histogramName, bucketType,
15.0);
}
@Test
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
index 9e5b69d6087..274b6161099 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
@@ -134,7 +134,7 @@ public class StreamingInsertsMetricsTest {
// Validate RPC latency metric.
MetricName histogramName =
MetricName.named("BigQuerySink",
"RpcLatency*rpc_method:STREAMING_INSERTS;");
- HistogramData.BucketType bucketType =
HistogramData.ExponentialBuckets.of(1, 34);
+ HistogramData.BucketType bucketType =
HistogramData.ExponentialBuckets.of(0, 17);
testContainer.assertPerWorkerHistogramValues(histogramName, bucketType,
10.0, 20.0, 30.0, 40.0);
// Validate RPC Status metric.