[
https://issues.apache.org/jira/browse/BEAM-14121?focusedWorklogId=775869&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-775869
]
ASF GitHub Bot logged work on BEAM-14121:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 30/May/22 11:38
Start Date: 30/May/22 11:38
Worklog Time Spent: 10m
Work Description: nielm commented on code in PR #17335:
URL: https://github.com/apache/beam/pull/17335#discussion_r884734522
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java:
##########
@@ -114,75 +132,62 @@ public void teardown() throws Exception {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
- BatchReadOnlyTransaction context =
+ BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
- for (Partition p : execute(c.element(), context)) {
- c.output(p);
- }
- }
-
- private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction
tx) {
- if (config.getRpcPriority() != null && config.getRpcPriority().get() !=
null) {
- return executeWithPriority(op, tx, config.getRpcPriority().get());
- } else {
- return executeWithoutPriority(op, tx);
- }
- }
-
- private List<Partition> executeWithoutPriority(ReadOperation op,
BatchReadOnlyTransaction tx) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
- }
- // Read with index was selected.
- if (op.getIndex() != null) {
- return tx.partitionReadUsingIndex(
- op.getPartitionOptions(),
- op.getTable(),
- op.getIndex(),
- op.getKeySet(),
- op.getColumns());
- }
- // Read from table was selected.
- return tx.partitionRead(
- op.getPartitionOptions(), op.getTable(), op.getKeySet(),
op.getColumns());
- }
-
- private List<Partition> executeWithPriority(
- ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority
rpcPriority) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(
- op.getPartitionOptions(), op.getQuery(),
Options.priority(rpcPriority));
+ ReadOperation op = c.element();
+
+ // While this creates a ServiceCallMetric for every input element, in
reality, the number
+ // of input elements will either be very few (normally 1!), or they will
differ and
+ // need different metrics.
+ ServiceCallMetric metric =
ReadAll.buildServiceCallMetricForReadOp(config, op);
+
+ List<Partition> partitions;
+ try {
+ if (op.getQuery() != null) {
+ // Query was selected.
+ partitions =
+ batchTx.partitionQuery(
+ op.getPartitionOptions(),
+ op.getQuery(),
+ Options.priority(config.getRpcPriority().get()));
+ } else if (op.getIndex() != null) {
+ // Read with index was selected.
+ partitions =
+ batchTx.partitionReadUsingIndex(
+ op.getPartitionOptions(),
+ op.getTable(),
+ op.getIndex(),
+ op.getKeySet(),
+ op.getColumns(),
+ Options.priority(config.getRpcPriority().get()));
Review Comment:
see above - config.getRpcPriority can no longer be null.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java:
##########
@@ -114,75 +132,62 @@ public void teardown() throws Exception {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
- BatchReadOnlyTransaction context =
+ BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
- for (Partition p : execute(c.element(), context)) {
- c.output(p);
- }
- }
-
- private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction
tx) {
- if (config.getRpcPriority() != null && config.getRpcPriority().get() !=
null) {
- return executeWithPriority(op, tx, config.getRpcPriority().get());
- } else {
- return executeWithoutPriority(op, tx);
- }
- }
-
- private List<Partition> executeWithoutPriority(ReadOperation op,
BatchReadOnlyTransaction tx) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
- }
- // Read with index was selected.
- if (op.getIndex() != null) {
- return tx.partitionReadUsingIndex(
- op.getPartitionOptions(),
- op.getTable(),
- op.getIndex(),
- op.getKeySet(),
- op.getColumns());
- }
- // Read from table was selected.
- return tx.partitionRead(
- op.getPartitionOptions(), op.getTable(), op.getKeySet(),
op.getColumns());
- }
-
- private List<Partition> executeWithPriority(
- ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority
rpcPriority) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(
- op.getPartitionOptions(), op.getQuery(),
Options.priority(rpcPriority));
+ ReadOperation op = c.element();
+
+ // While this creates a ServiceCallMetric for every input element, in
reality, the number
+ // of input elements will either be very few (normally 1!), or they will
differ and
+ // need different metrics.
+ ServiceCallMetric metric =
ReadAll.buildServiceCallMetricForReadOp(config, op);
+
+ List<Partition> partitions;
+ try {
Review Comment:
Normally config.getRpcPriority() should never be null - a default is set in
SpannerConfig.create().
However, I realise it is possible to pass null as a value (or as a null
ValueProvider) to SpannerConfig.setRpcPriority().
This (IMHO) is incorrect. There is a Preconditions.checkNotNull above to
ensure that getRpcPriority is non-null, and I have also added this same check
to SpannerConfig.
Issue Time Tracking
-------------------
Worklog Id: (was: 775869)
Remaining Estimate: 36h 50m (was: 37h)
Time Spent: 3h 10m (was: 3h)
> Incorrect Spanner IO Request Count metrics
> ------------------------------------------
>
> Key: BEAM-14121
> URL: https://issues.apache.org/jira/browse/BEAM-14121
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Affects Versions: 2.34.0
> Reporter: Niel Markwick
> Assignee: Niel Markwick
> Priority: P2
> Labels: google-cloud-spanner
> Original Estimate: 40h
> Time Spent: 3h 10m
> Remaining Estimate: 36h 50m
>
> IO request count metrics calculated incorrectly for GCP Spanner
>
> Resource ID is formulated incorrectly
> *Spanner Table:*
> {{//spanner.googleapis.com/projects/\{projectId}/{*}topics{*}/\{databaseId}/tables/\{tableId}}}
> should be
> {{//spanner.googleapis.com/projects/\{projectId}/instances/\{instanceId}/databases/\{databaseId}/tables/\{tableId}}}
> and is populated incorrectly – instance ID is used in place of tableID
> Spanner SQL Query:
> {{//spanner.googleapis.com/projects/\{projectId}/queries/\{queryName} }}
> {{should be}}
> {{{}//spanner.googleapis.com/projects/\{projectId}/{}}}{{{}instances/\{instanceId}/queries{}}}{{{}/\{queryName}
> {}}}
> and queryName is nullable which cause issued downstream
> this is not actually populated at all - queries are logged as reads on an
> instance.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)