[ 
https://issues.apache.org/jira/browse/BEAM-14121?focusedWorklogId=772191&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772191
 ]

ASF GitHub Bot logged work on BEAM-14121:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/May/22 00:02
            Start Date: 19/May/22 00:02
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on code in PR #17335:
URL: https://github.com/apache/beam/pull/17335#discussion_r876468653


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -377,6 +382,12 @@ public class SpannerIO {
   // Multiple of mutation size to use to gather and sort mutations
   private static final int DEFAULT_GROUPING_FACTOR = 1000;
 
+  // Size of caches for read/write ServiceCallMetric objects .
+  // This is a reasonable limit, as for reads, each worker will process very 
few different table
+  // read requests, and for writes, batching will ensure that write operations 
for the same
+  // table occur at

Review Comment:
   Incomplete comment.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java:
##########
@@ -114,75 +132,62 @@ public void teardown() throws Exception {
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       Transaction tx = c.sideInput(txView);
-      BatchReadOnlyTransaction context =
+      BatchReadOnlyTransaction batchTx =
           
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
-      for (Partition p : execute(c.element(), context)) {
-        c.output(p);
-      }
-    }
-
-    private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction 
tx) {
-      if (config.getRpcPriority() != null && config.getRpcPriority().get() != 
null) {
-        return executeWithPriority(op, tx, config.getRpcPriority().get());
-      } else {
-        return executeWithoutPriority(op, tx);
-      }
-    }
-
-    private List<Partition> executeWithoutPriority(ReadOperation op, 
BatchReadOnlyTransaction tx) {
-      // Query was selected.
-      if (op.getQuery() != null) {
-        return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
-      }
-      // Read with index was selected.
-      if (op.getIndex() != null) {
-        return tx.partitionReadUsingIndex(
-            op.getPartitionOptions(),
-            op.getTable(),
-            op.getIndex(),
-            op.getKeySet(),
-            op.getColumns());
-      }
-      // Read from table was selected.
-      return tx.partitionRead(
-          op.getPartitionOptions(), op.getTable(), op.getKeySet(), 
op.getColumns());
-    }
-
-    private List<Partition> executeWithPriority(
-        ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority 
rpcPriority) {
-      // Query was selected.
-      if (op.getQuery() != null) {
-        return tx.partitionQuery(
-            op.getPartitionOptions(), op.getQuery(), 
Options.priority(rpcPriority));
+      ReadOperation op = c.element();
+
+      // While this creates a ServiceCallMetric for every input element, in 
reality, the number
+      // of input elements will either be very few (normally 1!), or they will 
differ and
+      // need different metrics.
+      ServiceCallMetric metric = 
ReadAll.buildServiceCallMetricForReadOp(config, op);
+
+      List<Partition> partitions;
+      try {
+        if (op.getQuery() != null) {
+          // Query was selected.
+          partitions =
+              batchTx.partitionQuery(
+                  op.getPartitionOptions(),
+                  op.getQuery(),
+                  Options.priority(config.getRpcPriority().get()));
+        } else if (op.getIndex() != null) {
+          // Read with index was selected.
+          partitions =
+              batchTx.partitionReadUsingIndex(
+                  op.getPartitionOptions(),
+                  op.getTable(),
+                  op.getIndex(),
+                  op.getKeySet(),
+                  op.getColumns(),
+                  Options.priority(config.getRpcPriority().get()));

Review Comment:
   Previously this was called without a priority if config.getRpcPriority() == 
null or config.getRpcPriority().get() == null.
   
   Are we sure that this will not introduce a behavior change ?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java:
##########
@@ -114,75 +132,62 @@ public void teardown() throws Exception {
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       Transaction tx = c.sideInput(txView);
-      BatchReadOnlyTransaction context =
+      BatchReadOnlyTransaction batchTx =
           
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
-      for (Partition p : execute(c.element(), context)) {
-        c.output(p);
-      }
-    }
-
-    private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction 
tx) {
-      if (config.getRpcPriority() != null && config.getRpcPriority().get() != 
null) {
-        return executeWithPriority(op, tx, config.getRpcPriority().get());
-      } else {
-        return executeWithoutPriority(op, tx);
-      }
-    }
-
-    private List<Partition> executeWithoutPriority(ReadOperation op, 
BatchReadOnlyTransaction tx) {
-      // Query was selected.
-      if (op.getQuery() != null) {
-        return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
-      }
-      // Read with index was selected.
-      if (op.getIndex() != null) {
-        return tx.partitionReadUsingIndex(
-            op.getPartitionOptions(),
-            op.getTable(),
-            op.getIndex(),
-            op.getKeySet(),
-            op.getColumns());
-      }
-      // Read from table was selected.
-      return tx.partitionRead(
-          op.getPartitionOptions(), op.getTable(), op.getKeySet(), 
op.getColumns());
-    }
-
-    private List<Partition> executeWithPriority(
-        ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority 
rpcPriority) {
-      // Query was selected.
-      if (op.getQuery() != null) {
-        return tx.partitionQuery(
-            op.getPartitionOptions(), op.getQuery(), 
Options.priority(rpcPriority));
+      ReadOperation op = c.element();
+
+      // While this creates a ServiceCallMetric for every input element, in 
reality, the number
+      // of input elements will either be very few (normally 1!), or they will 
differ and
+      // need different metrics.
+      ServiceCallMetric metric = 
ReadAll.buildServiceCallMetricForReadOp(config, op);
+
+      List<Partition> partitions;
+      try {
+        if (op.getQuery() != null) {
+          // Query was selected.
+          partitions =
+              batchTx.partitionQuery(
+                  op.getPartitionOptions(),
+                  op.getQuery(),
+                  Options.priority(config.getRpcPriority().get()));
+        } else if (op.getIndex() != null) {
+          // Read with index was selected.
+          partitions =
+              batchTx.partitionReadUsingIndex(
+                  op.getPartitionOptions(),
+                  op.getTable(),
+                  op.getIndex(),
+                  op.getKeySet(),
+                  op.getColumns(),
+                  Options.priority(config.getRpcPriority().get()));
+        } else {
+          // Read from table was selected.
+          partitions =
+              batchTx.partitionRead(
+                  op.getPartitionOptions(),
+                  op.getTable(),
+                  op.getKeySet(),
+                  op.getColumns(),
+                  Options.priority(config.getRpcPriority().get()));

Review Comment:
   Ditto.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java:
##########
@@ -114,75 +132,62 @@ public void teardown() throws Exception {
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       Transaction tx = c.sideInput(txView);
-      BatchReadOnlyTransaction context =
+      BatchReadOnlyTransaction batchTx =
           
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
-      for (Partition p : execute(c.element(), context)) {
-        c.output(p);
-      }
-    }
-
-    private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction 
tx) {
-      if (config.getRpcPriority() != null && config.getRpcPriority().get() != 
null) {
-        return executeWithPriority(op, tx, config.getRpcPriority().get());
-      } else {
-        return executeWithoutPriority(op, tx);
-      }
-    }
-
-    private List<Partition> executeWithoutPriority(ReadOperation op, 
BatchReadOnlyTransaction tx) {
-      // Query was selected.
-      if (op.getQuery() != null) {
-        return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
-      }
-      // Read with index was selected.
-      if (op.getIndex() != null) {
-        return tx.partitionReadUsingIndex(
-            op.getPartitionOptions(),
-            op.getTable(),
-            op.getIndex(),
-            op.getKeySet(),
-            op.getColumns());
-      }
-      // Read from table was selected.
-      return tx.partitionRead(
-          op.getPartitionOptions(), op.getTable(), op.getKeySet(), 
op.getColumns());
-    }
-
-    private List<Partition> executeWithPriority(
-        ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority 
rpcPriority) {
-      // Query was selected.
-      if (op.getQuery() != null) {
-        return tx.partitionQuery(
-            op.getPartitionOptions(), op.getQuery(), 
Options.priority(rpcPriority));
+      ReadOperation op = c.element();
+
+      // While this creates a ServiceCallMetric for every input element, in 
reality, the number
+      // of input elements will either be very few (normally 1!), or they will 
differ and
+      // need different metrics.
+      ServiceCallMetric metric = 
ReadAll.buildServiceCallMetricForReadOp(config, op);
+
+      List<Partition> partitions;
+      try {

Review Comment:
   Is "executeWithoutPriority" path not needed anymore ?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -377,6 +382,12 @@ public class SpannerIO {
   // Multiple of mutation size to use to gather and sort mutations
   private static final int DEFAULT_GROUPING_FACTOR = 1000;
 
+  // Size of caches for read/write ServiceCallMetric objects .
+  // This is a reasonable limit, as for reads, each worker will process very 
few different table
+  // read requests, and for writes, batching will ensure that write operations 
for the same
+  // table occur at
+  public static final int METRICS_CACHE_SIZE = 100;

Review Comment:
   Can this be package private ?





Issue Time Tracking
-------------------

            Worklog Id:     (was: 772191)
    Remaining Estimate: 37h 10m  (was: 37h 20m)
            Time Spent: 2h 50m  (was: 2h 40m)

> Incorrect Spanner IO Request Count metrics
> ------------------------------------------
>
>                 Key: BEAM-14121
>                 URL: https://issues.apache.org/jira/browse/BEAM-14121
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.34.0
>            Reporter: Niel Markwick
>            Assignee: Niel Markwick
>            Priority: P2
>              Labels: google-cloud-spanner
>   Original Estimate: 40h
>          Time Spent: 2h 50m
>  Remaining Estimate: 37h 10m
>
> IO request count metrics calculated incorrectly for GCP Spanner
>  
> Resource ID is formulated incorrectly
> *Spanner Table:*
> {{//spanner.googleapis.com/projects/\{projectId}/{*}topics{*}/\{databaseId}/tables/\{tableId}}}
> should be
> {{//spanner.googleapis.com/projects/\{projectId}/instances/\{instanceId}/databases/\{databaseId}/tables/\{tableId}}}
> and is populated incorrectly – instance ID is used in place of tableID
> Spanner SQL Query:
> {{//spanner.googleapis.com/projects/\{projectId}/queries/\{queryName} }}
> {{should be}}
> {{{}//spanner.googleapis.com/projects/\{projectId}/{}}}{{{}instances/\{instanceId}/queries{}}}{{{}/\{queryName}
>  {}}}
> and queryName is nullable which cause issued downstream
> this is not actually populated at all - queries are logged as reads on an 
> instance. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to