This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e8a16d9eae Use base 2 exponential histograms in BigQuerySinkMetrics 
(#31000)
1e8a16d9eae is described below

commit 1e8a16d9eae3bfd8c42ea102c0ed2d7de5cbe060
Author: JayajP <[email protected]>
AuthorDate: Wed Apr 17 09:26:20 2024 -0700

    Use base 2 exponential histograms in BigQuerySinkMetrics (#31000)
    
    * Use base 2 exponential histograms in BigQuerySinkMetrics
---
 .../sdk/io/gcp/bigquery/BigQuerySinkMetrics.java   |  7 +-
 .../io/gcp/bigquery/BigQuerySinkMetricsTest.java   | 76 +++++-----------------
 .../gcp/bigquery/StreamingInsertsMetricsTest.java  |  2 +-
 3 files changed, 24 insertions(+), 61 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
index b80438aceeb..ecdf8eee4bf 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
@@ -106,7 +106,7 @@ public class BigQuerySinkMetrics {
    * <p>'RpcLatency-Method:{method};'
    *
    * @param method StorageWriteAPI method associated with this metric.
-   * @return Histogram with exponential buckets with a sqrt(2) growth factor.
+   * @return Histogram with exponential buckets with a size 2 growth factor.
    */
   static Histogram createRPCLatencyHistogram(RpcMethod method) {
     LabeledMetricNameUtils.MetricNameBuilder nameBuilder =
@@ -114,7 +114,10 @@ public class BigQuerySinkMetrics {
     nameBuilder.addLabel(RPC_METHOD, method.toString());
     MetricName metricName = nameBuilder.build(METRICS_NAMESPACE);
 
-    HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 
34);
+    // Create Exponential histogram buckets wtih the following parameters:
+    // 0 scale, resulting in bucket widths with a size 2 growth factor.
+    // 17 buckets, so the max latency of that can be stored is (2^17 millis ~= 
130 seconds).
+    HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(0, 
17);
 
     return new DelegatingHistogram(metricName, buckets, false, true);
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
index 37472c4db3d..5254ab514db 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
@@ -128,10 +128,7 @@ public class BigQuerySinkMetricsTest {
     MetricName deletesDisabledCounterName =
         MetricName.named(
             "BigQuerySink", 
"RowsAppendedCount*row_status:SUCCESSFUL;rpc_status:rpcStatus;");
-    assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(deletesDisabledCounterName));
-    assertThat(
-        
testContainer.perWorkerCounters.get(deletesDisabledCounterName).getCumulative(),
-        equalTo(1L));
+    testContainer.assertPerWorkerCounterValue(deletesDisabledCounterName, 1L);
 
     BigQuerySinkMetrics.setSupportMetricsDeletion(true);
     testContainer.reset();
@@ -143,10 +140,7 @@ public class BigQuerySinkMetricsTest {
         MetricName.named(
             "BigQuerySink",
             
"RowsAppendedCount*row_status:SUCCESSFUL;rpc_status:rpcStatus;table_id:tableId;");
-    assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(deletesEnabledCounterName));
-    assertThat(
-        
testContainer.perWorkerCounters.get(deletesEnabledCounterName).getCumulative(),
-        equalTo(1L));
+    testContainer.assertPerWorkerCounterValue(deletesEnabledCounterName, 1L);
   }
 
   @Test
@@ -179,8 +173,7 @@ public class BigQuerySinkMetricsTest {
 
     MetricName counterName =
         MetricName.named("BigQuerySink", 
"ThrottledTime*rpc_method:APPEND_ROWS;");
-    assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(counterName));
-    
assertThat(testContainer.perWorkerCounters.get(counterName).getCumulative(), 
equalTo(1L));
+    testContainer.assertPerWorkerCounterValue(counterName, 1L);
   }
 
   @Test
@@ -201,14 +194,9 @@ public class BigQuerySinkMetricsTest {
         MetricName.named("BigQuerySink", 
"RpcRequestsCount*rpc_method:APPEND_ROWS;rpc_status:OK;");
     MetricName histogramName =
         MetricName.named("BigQuerySink", "RpcLatency*rpc_method:APPEND_ROWS;");
-    HistogramData.BucketType bucketType = 
HistogramData.ExponentialBuckets.of(1, 34);
-    assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(counterNameDisabledDeletes));
-    assertThat(
-        
testContainer.perWorkerCounters.get(counterNameDisabledDeletes).getCumulative(),
-        equalTo(1L));
-    assertThat(
-        testContainer.perWorkerHistograms.get(KV.of(histogramName, 
bucketType)).values,
-        containsInAnyOrder(Double.valueOf(3.0)));
+    HistogramData.BucketType bucketType = 
HistogramData.ExponentialBuckets.of(0, 17);
+    testContainer.assertPerWorkerCounterValue(counterNameDisabledDeletes, 1L);
+    testContainer.assertPerWorkerHistogramValues(histogramName, bucketType, 
3.0);
 
     // Test enable SupportMetricsDeletion.
     BigQuerySinkMetrics.setSupportMetricsDeletion(true);
@@ -219,13 +207,8 @@ public class BigQuerySinkMetricsTest {
         MetricName.named(
             "BigQuerySink",
             
"RpcRequestsCount*rpc_method:APPEND_ROWS;rpc_status:OK;table_id:tableId;");
-    assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(counterNameEnabledDeletes));
-    assertThat(
-        
testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(),
-        equalTo(1L));
-    assertThat(
-        testContainer.perWorkerHistograms.get(KV.of(histogramName, 
bucketType)).values,
-        containsInAnyOrder(Double.valueOf(3.0)));
+    testContainer.assertPerWorkerCounterValue(counterNameEnabledDeletes, 1L);
+    testContainer.assertPerWorkerHistogramValues(histogramName, bucketType, 
3.0);
   }
 
   @Test
@@ -251,17 +234,9 @@ public class BigQuerySinkMetricsTest {
             "BigQuerySink", 
"RpcRequestsCount*rpc_method:APPEND_ROWS;rpc_status:NOT_FOUND;");
     MetricName histogramName =
         MetricName.named("BigQuerySink", "RpcLatency*rpc_method:APPEND_ROWS;");
-    HistogramData.BucketType bucketType = 
HistogramData.ExponentialBuckets.of(1, 34);
-    assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(counterNameDisabledDeletes));
-    assertThat(
-        
testContainer.perWorkerCounters.get(counterNameDisabledDeletes).getCumulative(),
-        equalTo(1L));
-    assertThat(
-        testContainer.perWorkerHistograms,
-        IsMapContaining.hasKey(KV.of(histogramName, bucketType)));
-    assertThat(
-        testContainer.perWorkerHistograms.get(KV.of(histogramName, 
bucketType)).values,
-        containsInAnyOrder(Double.valueOf(5.0)));
+    HistogramData.BucketType bucketType = 
HistogramData.ExponentialBuckets.of(0, 17);
+    testContainer.assertPerWorkerCounterValue(counterNameDisabledDeletes, 1L);
+    testContainer.assertPerWorkerHistogramValues(histogramName, bucketType, 
5.0);
 
     // Test enable SupportMetricsDeletion
     BigQuerySinkMetrics.setSupportMetricsDeletion(true);
@@ -272,13 +247,8 @@ public class BigQuerySinkMetricsTest {
         MetricName.named(
             "BigQuerySink",
             
"RpcRequestsCount*rpc_method:APPEND_ROWS;rpc_status:NOT_FOUND;table_id:tableId;");
-    assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(counterNameEnabledDeletes));
-    assertThat(
-        
testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(),
-        equalTo(1L));
-    assertThat(
-        testContainer.perWorkerHistograms.get(KV.of(histogramName, 
bucketType)).values,
-        containsInAnyOrder(Double.valueOf(5.0)));
+    testContainer.assertPerWorkerCounterValue(counterNameEnabledDeletes, 1L);
+    testContainer.assertPerWorkerHistogramValues(histogramName, bucketType, 
5.0);
   }
 
   @Test
@@ -303,14 +273,9 @@ public class BigQuerySinkMetricsTest {
             "BigQuerySink", 
"RpcRequestsCount*rpc_method:APPEND_ROWS;rpc_status:UNKNOWN;");
     MetricName histogramName =
         MetricName.named("BigQuerySink", "RpcLatency*rpc_method:APPEND_ROWS;");
-    HistogramData.BucketType bucketType = 
HistogramData.ExponentialBuckets.of(1, 34);
-    assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(counterNameDisabledDeletes));
-    assertThat(
-        
testContainer.perWorkerCounters.get(counterNameDisabledDeletes).getCumulative(),
-        equalTo(1L));
-    assertThat(
-        testContainer.perWorkerHistograms.get(KV.of(histogramName, 
bucketType)).values,
-        containsInAnyOrder(Double.valueOf(15.0)));
+    HistogramData.BucketType bucketType = 
HistogramData.ExponentialBuckets.of(0, 17);
+    testContainer.assertPerWorkerCounterValue(counterNameDisabledDeletes, 1L);
+    testContainer.assertPerWorkerHistogramValues(histogramName, bucketType, 
15.0);
 
     // Test enable SupportMetricsDeletion
     BigQuerySinkMetrics.setSupportMetricsDeletion(true);
@@ -321,13 +286,8 @@ public class BigQuerySinkMetricsTest {
         MetricName.named(
             "BigQuerySink",
             
"RpcRequestsCount*rpc_method:APPEND_ROWS;rpc_status:UNKNOWN;table_id:tableId;");
-    assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(counterNameEnabledDeletes));
-    assertThat(
-        
testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(),
-        equalTo(1L));
-    assertThat(
-        testContainer.perWorkerHistograms.get(KV.of(histogramName, 
bucketType)).values,
-        containsInAnyOrder(Double.valueOf(15.0)));
+    testContainer.assertPerWorkerCounterValue(counterNameEnabledDeletes, 1L);
+    testContainer.assertPerWorkerHistogramValues(histogramName, bucketType, 
15.0);
   }
 
   @Test
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
index 9e5b69d6087..274b6161099 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
@@ -134,7 +134,7 @@ public class StreamingInsertsMetricsTest {
     // Validate RPC latency metric.
     MetricName histogramName =
         MetricName.named("BigQuerySink", 
"RpcLatency*rpc_method:STREAMING_INSERTS;");
-    HistogramData.BucketType bucketType = 
HistogramData.ExponentialBuckets.of(1, 34);
+    HistogramData.BucketType bucketType = 
HistogramData.ExponentialBuckets.of(0, 17);
     testContainer.assertPerWorkerHistogramValues(histogramName, bucketType, 
10.0, 20.0, 30.0, 40.0);
 
     // Validate RPC Status metric.

Reply via email to