ahmedabu98 commented on code in PR #31253:
URL: https://github.com/apache/beam/pull/31253#discussion_r1602206001
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java:
##########
@@ -166,12 +168,17 @@ public static Counter appendRowsRowStatusCounter(
* @return Counter that tracks throttled time due to RPC retries.
*/
public static Counter throttledTimeCounter(RpcMethod method) {
+
LabeledMetricNameUtils.MetricNameBuilder nameBuilder =
LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(THROTTLED_TIME);
nameBuilder.addLabel(RPC_METHOD, method.toString());
MetricName metricName = nameBuilder.build(METRICS_NAMESPACE);
-
- return new DelegatingCounter(metricName, false, true);
+ // for specific method
+ Counter fineCounter = new DelegatingCounter(metricName, false, true);
+ // for overall throttling time, used by runner for scaling decision
+ Counter coarseCounter =
BigQueryServicesImpl.StorageClientImpl.THROTTLING_MSECS;
+ return new NestedCounter(
+ MetricName.named(METRICS_NAMESPACE, "nested"), fineCounter,
coarseCounter);
Review Comment:
Can we keep the name how it previously was? To keep observability experience
consistent
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java:
##########
@@ -212,4 +215,60 @@ public void testInsertAll() throws Exception {
// Each of the 25 rows has 1 byte for length and 30 bytes:
'{"f":[{"v":"foo"},{"v":1234}]}'
assertEquals("Incorrect byte count", 25L * 31L, totalBytes);
}
+
+ static class ReadableCounter implements Counter {
+
+ private MetricName name;
+ private long value;
+
+ public ReadableCounter(MetricName name) {
+ this.name = name;
+ this.value = 0;
+ }
+
+ public long getValue() {
+ return value;
+ }
+
+ @Override
+ public void inc() {
+ ++value;
+ }
+
+ @Override
+ public void inc(long n) {
+ value += n;
+ }
+
+ @Override
+ public void dec() {
+ --value;
+ }
+
+ @Override
+ public void dec(long n) {
+ value -= n;
+ }
+
+ @Override
+ public MetricName getName() {
+ return name;
+ }
+ }
+
+ @Test
+ public void testNestedCounter() {
+ MetricName name1 = MetricName.named(this.getClass(), "metric1");
+ MetricName name2 = MetricName.named(this.getClass(), "metric2");
+ ReadableCounter counter1 = new ReadableCounter(name1);
+ ReadableCounter counter2 = new ReadableCounter(name2);
+ NestedCounter nested =
+ new NestedCounter(MetricName.named(this.getClass(), "nested"),
counter1, counter2);
+ nested.inc();
+ nested.inc(10);
+ nested.dec();
+ nested.dec(2);
+ assertEquals(8, counter1.getValue());
+ assertEquals(8, counter2.getValue());
+ }
Review Comment:
Suggestion:
increment one sub-counter before incrementing thru nested to check that both
increments are recognized
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java:
##########
@@ -1081,4 +1084,52 @@ private static Object convertAvroNumeric(Object value) {
public static ServiceCallMetric writeCallMetric(TableReference
tableReference) {
return callMetricForMethod(tableReference, "BigQueryBatchWrite");
}
+
+ /**
+ * A counter holding a list of counters. Increment the counter will
increment every sub-counter it
+ * holds.
+ */
+ static class NestedCounter implements Counter, Serializable {
+
Review Comment:
This is a nice abstraction. Should we include it under package
`org.apache.beam.sdk.metrics` instead?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java:
##########
@@ -1081,4 +1084,52 @@ private static Object convertAvroNumeric(Object value) {
public static ServiceCallMetric writeCallMetric(TableReference
tableReference) {
return callMetricForMethod(tableReference, "BigQueryBatchWrite");
}
+
+ /**
+ * A counter holding a list of counters. Increment the counter will
increment every sub-counter it
+ * holds.
+ */
+ static class NestedCounter implements Counter, Serializable {
+
+ private final MetricName name;
+ private final ImmutableList<Counter> counters;
+
+ public NestedCounter(MetricName name, Counter... counters) {
+ this.name = name;
+ this.counters = ImmutableList.copyOf(counters);
+ }
+
+ @Override
+ public void inc() {
+ for (Counter counter : counters) {
+ counter.inc();
+ }
+ }
+
+ @Override
+ public void inc(long n) {
+ for (Counter counter : counters) {
+ counter.inc(n);
+ }
+ }
+
+ @Override
+ public void dec() {
+ for (Counter counter : counters) {
+ counter.dec();
+ }
+ }
+
+ @Override
+ public void dec(long n) {
+ for (Counter counter : counters) {
+ counter.dec(n);
+ }
+ }
+
+ @Override
+ public MetricName getName() {
+ return name;
+ }
Review Comment:
Does it make sense to also include the sub-counter names here?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -818,8 +818,8 @@ public void process(
RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
new RetryManager<>(
Duration.standardSeconds(1),
- Duration.standardSeconds(10),
- 1000,
+ Duration.standardSeconds(20),
+ 500,
Review Comment:
SGTM
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]