[ 
https://issues.apache.org/jira/browse/BEAM-14121?focusedWorklogId=775869&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-775869
 ]

ASF GitHub Bot logged work on BEAM-14121:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/May/22 11:38
            Start Date: 30/May/22 11:38
    Worklog Time Spent: 10m 
      Work Description: nielm commented on code in PR #17335:
URL: https://github.com/apache/beam/pull/17335#discussion_r884734522


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java:
##########
@@ -114,75 +132,62 @@ public void teardown() throws Exception {
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       Transaction tx = c.sideInput(txView);
-      BatchReadOnlyTransaction context =
+      BatchReadOnlyTransaction batchTx =
           
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
-      for (Partition p : execute(c.element(), context)) {
-        c.output(p);
-      }
-    }
-
-    private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction 
tx) {
-      if (config.getRpcPriority() != null && config.getRpcPriority().get() != 
null) {
-        return executeWithPriority(op, tx, config.getRpcPriority().get());
-      } else {
-        return executeWithoutPriority(op, tx);
-      }
-    }
-
-    private List<Partition> executeWithoutPriority(ReadOperation op, 
BatchReadOnlyTransaction tx) {
-      // Query was selected.
-      if (op.getQuery() != null) {
-        return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
-      }
-      // Read with index was selected.
-      if (op.getIndex() != null) {
-        return tx.partitionReadUsingIndex(
-            op.getPartitionOptions(),
-            op.getTable(),
-            op.getIndex(),
-            op.getKeySet(),
-            op.getColumns());
-      }
-      // Read from table was selected.
-      return tx.partitionRead(
-          op.getPartitionOptions(), op.getTable(), op.getKeySet(), 
op.getColumns());
-    }
-
-    private List<Partition> executeWithPriority(
-        ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority 
rpcPriority) {
-      // Query was selected.
-      if (op.getQuery() != null) {
-        return tx.partitionQuery(
-            op.getPartitionOptions(), op.getQuery(), 
Options.priority(rpcPriority));
+      ReadOperation op = c.element();
+
+      // While this creates a ServiceCallMetric for every input element, in 
reality, the number
+      // of input elements will either be very few (normally 1!), or they will 
differ and
+      // need different metrics.
+      ServiceCallMetric metric = 
ReadAll.buildServiceCallMetricForReadOp(config, op);
+
+      List<Partition> partitions;
+      try {
+        if (op.getQuery() != null) {
+          // Query was selected.
+          partitions =
+              batchTx.partitionQuery(
+                  op.getPartitionOptions(),
+                  op.getQuery(),
+                  Options.priority(config.getRpcPriority().get()));
+        } else if (op.getIndex() != null) {
+          // Read with index was selected.
+          partitions =
+              batchTx.partitionReadUsingIndex(
+                  op.getPartitionOptions(),
+                  op.getTable(),
+                  op.getIndex(),
+                  op.getKeySet(),
+                  op.getColumns(),
+                  Options.priority(config.getRpcPriority().get()));

Review Comment:
   see above - config.getRpcPriority can no longer be null. 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java:
##########
@@ -114,75 +132,62 @@ public void teardown() throws Exception {
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       Transaction tx = c.sideInput(txView);
-      BatchReadOnlyTransaction context =
+      BatchReadOnlyTransaction batchTx =
           
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
-      for (Partition p : execute(c.element(), context)) {
-        c.output(p);
-      }
-    }
-
-    private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction 
tx) {
-      if (config.getRpcPriority() != null && config.getRpcPriority().get() != 
null) {
-        return executeWithPriority(op, tx, config.getRpcPriority().get());
-      } else {
-        return executeWithoutPriority(op, tx);
-      }
-    }
-
-    private List<Partition> executeWithoutPriority(ReadOperation op, 
BatchReadOnlyTransaction tx) {
-      // Query was selected.
-      if (op.getQuery() != null) {
-        return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
-      }
-      // Read with index was selected.
-      if (op.getIndex() != null) {
-        return tx.partitionReadUsingIndex(
-            op.getPartitionOptions(),
-            op.getTable(),
-            op.getIndex(),
-            op.getKeySet(),
-            op.getColumns());
-      }
-      // Read from table was selected.
-      return tx.partitionRead(
-          op.getPartitionOptions(), op.getTable(), op.getKeySet(), 
op.getColumns());
-    }
-
-    private List<Partition> executeWithPriority(
-        ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority 
rpcPriority) {
-      // Query was selected.
-      if (op.getQuery() != null) {
-        return tx.partitionQuery(
-            op.getPartitionOptions(), op.getQuery(), 
Options.priority(rpcPriority));
+      ReadOperation op = c.element();
+
+      // While this creates a ServiceCallMetric for every input element, in 
reality, the number
+      // of input elements will either be very few (normally 1!), or they will 
differ and
+      // need different metrics.
+      ServiceCallMetric metric = 
ReadAll.buildServiceCallMetricForReadOp(config, op);
+
+      List<Partition> partitions;
+      try {

Review Comment:
   Normally config.getRpcPriority() should never be null - a default is set in 
SpannerConfig.create().
   However, I realise it is possible to pass null as a value (or as a null 
ValueProvider)  to SpannerConfig.setRpcPriority().
   
   This (IMHO) is incorrect. There is a Preconditions.checkNotNull above to 
ensure that getRpcPriority is non-null, and I have also added this same check 
to SpannerConfig.
   
   





Issue Time Tracking
-------------------

            Worklog Id:     (was: 775869)
    Remaining Estimate: 36h 50m  (was: 37h)
            Time Spent: 3h 10m  (was: 3h)

> Incorrect Spanner IO Request Count metrics
> ------------------------------------------
>
>                 Key: BEAM-14121
>                 URL: https://issues.apache.org/jira/browse/BEAM-14121
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.34.0
>            Reporter: Niel Markwick
>            Assignee: Niel Markwick
>            Priority: P2
>              Labels: google-cloud-spanner
>   Original Estimate: 40h
>          Time Spent: 3h 10m
>  Remaining Estimate: 36h 50m
>
> IO request count metrics calculated incorrectly for GCP Spanner
>  
> Resource ID is formulated incorrectly
> *Spanner Table:*
> {{//spanner.googleapis.com/projects/\{projectId}/{*}topics{*}/\{databaseId}/tables/\{tableId}}}
> should be
> {{//spanner.googleapis.com/projects/\{projectId}/instances/\{instanceId}/databases/\{databaseId}/tables/\{tableId}}}
> and is populated incorrectly – instance ID is used in place of tableID
> Spanner SQL Query:
> {{//spanner.googleapis.com/projects/\{projectId}/queries/\{queryName} }}
> {{should be}}
> {{{}//spanner.googleapis.com/projects/\{projectId}/{}}}{{{}instances/\{instanceId}/queries{}}}{{{}/\{queryName}
>  {}}}
> and queryName is nullable which cause issued downstream
> this is not actually populated at all - queries are logged as reads on an 
> instance. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to