[
https://issues.apache.org/jira/browse/BEAM-14121?focusedWorklogId=772191&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772191
]
ASF GitHub Bot logged work on BEAM-14121:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 19/May/22 00:02
Start Date: 19/May/22 00:02
Worklog Time Spent: 10m
Work Description: chamikaramj commented on code in PR #17335:
URL: https://github.com/apache/beam/pull/17335#discussion_r876468653
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -377,6 +382,12 @@ public class SpannerIO {
// Multiple of mutation size to use to gather and sort mutations
private static final int DEFAULT_GROUPING_FACTOR = 1000;
+ // Size of caches for read/write ServiceCallMetric objects .
+ // This is a reasonable limit, as for reads, each worker will process very
few different table
+ // read requests, and for writes, batching will ensure that write operations
for the same
+ // table occur at
Review Comment:
Incomplete comment.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java:
##########
@@ -114,75 +132,62 @@ public void teardown() throws Exception {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
- BatchReadOnlyTransaction context =
+ BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
- for (Partition p : execute(c.element(), context)) {
- c.output(p);
- }
- }
-
- private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction
tx) {
- if (config.getRpcPriority() != null && config.getRpcPriority().get() !=
null) {
- return executeWithPriority(op, tx, config.getRpcPriority().get());
- } else {
- return executeWithoutPriority(op, tx);
- }
- }
-
- private List<Partition> executeWithoutPriority(ReadOperation op,
BatchReadOnlyTransaction tx) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
- }
- // Read with index was selected.
- if (op.getIndex() != null) {
- return tx.partitionReadUsingIndex(
- op.getPartitionOptions(),
- op.getTable(),
- op.getIndex(),
- op.getKeySet(),
- op.getColumns());
- }
- // Read from table was selected.
- return tx.partitionRead(
- op.getPartitionOptions(), op.getTable(), op.getKeySet(),
op.getColumns());
- }
-
- private List<Partition> executeWithPriority(
- ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority
rpcPriority) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(
- op.getPartitionOptions(), op.getQuery(),
Options.priority(rpcPriority));
+ ReadOperation op = c.element();
+
+ // While this creates a ServiceCallMetric for every input element, in
reality, the number
+ // of input elements will either be very few (normally 1!), or they will
differ and
+ // need different metrics.
+ ServiceCallMetric metric =
ReadAll.buildServiceCallMetricForReadOp(config, op);
+
+ List<Partition> partitions;
+ try {
+ if (op.getQuery() != null) {
+ // Query was selected.
+ partitions =
+ batchTx.partitionQuery(
+ op.getPartitionOptions(),
+ op.getQuery(),
+ Options.priority(config.getRpcPriority().get()));
+ } else if (op.getIndex() != null) {
+ // Read with index was selected.
+ partitions =
+ batchTx.partitionReadUsingIndex(
+ op.getPartitionOptions(),
+ op.getTable(),
+ op.getIndex(),
+ op.getKeySet(),
+ op.getColumns(),
+ Options.priority(config.getRpcPriority().get()));
Review Comment:
Previously this was called without a priority if config.getRpcPriority() ==
null or config.getRpcPriority().get() == null.
Are we sure that this will not introduce a behavior change ?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java:
##########
@@ -114,75 +132,62 @@ public void teardown() throws Exception {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
- BatchReadOnlyTransaction context =
+ BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
- for (Partition p : execute(c.element(), context)) {
- c.output(p);
- }
- }
-
- private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction
tx) {
- if (config.getRpcPriority() != null && config.getRpcPriority().get() !=
null) {
- return executeWithPriority(op, tx, config.getRpcPriority().get());
- } else {
- return executeWithoutPriority(op, tx);
- }
- }
-
- private List<Partition> executeWithoutPriority(ReadOperation op,
BatchReadOnlyTransaction tx) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
- }
- // Read with index was selected.
- if (op.getIndex() != null) {
- return tx.partitionReadUsingIndex(
- op.getPartitionOptions(),
- op.getTable(),
- op.getIndex(),
- op.getKeySet(),
- op.getColumns());
- }
- // Read from table was selected.
- return tx.partitionRead(
- op.getPartitionOptions(), op.getTable(), op.getKeySet(),
op.getColumns());
- }
-
- private List<Partition> executeWithPriority(
- ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority
rpcPriority) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(
- op.getPartitionOptions(), op.getQuery(),
Options.priority(rpcPriority));
+ ReadOperation op = c.element();
+
+ // While this creates a ServiceCallMetric for every input element, in
reality, the number
+ // of input elements will either be very few (normally 1!), or they will
differ and
+ // need different metrics.
+ ServiceCallMetric metric =
ReadAll.buildServiceCallMetricForReadOp(config, op);
+
+ List<Partition> partitions;
+ try {
+ if (op.getQuery() != null) {
+ // Query was selected.
+ partitions =
+ batchTx.partitionQuery(
+ op.getPartitionOptions(),
+ op.getQuery(),
+ Options.priority(config.getRpcPriority().get()));
+ } else if (op.getIndex() != null) {
+ // Read with index was selected.
+ partitions =
+ batchTx.partitionReadUsingIndex(
+ op.getPartitionOptions(),
+ op.getTable(),
+ op.getIndex(),
+ op.getKeySet(),
+ op.getColumns(),
+ Options.priority(config.getRpcPriority().get()));
+ } else {
+ // Read from table was selected.
+ partitions =
+ batchTx.partitionRead(
+ op.getPartitionOptions(),
+ op.getTable(),
+ op.getKeySet(),
+ op.getColumns(),
+ Options.priority(config.getRpcPriority().get()));
Review Comment:
Ditto.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java:
##########
@@ -114,75 +132,62 @@ public void teardown() throws Exception {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
- BatchReadOnlyTransaction context =
+ BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
- for (Partition p : execute(c.element(), context)) {
- c.output(p);
- }
- }
-
- private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction
tx) {
- if (config.getRpcPriority() != null && config.getRpcPriority().get() !=
null) {
- return executeWithPriority(op, tx, config.getRpcPriority().get());
- } else {
- return executeWithoutPriority(op, tx);
- }
- }
-
- private List<Partition> executeWithoutPriority(ReadOperation op,
BatchReadOnlyTransaction tx) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
- }
- // Read with index was selected.
- if (op.getIndex() != null) {
- return tx.partitionReadUsingIndex(
- op.getPartitionOptions(),
- op.getTable(),
- op.getIndex(),
- op.getKeySet(),
- op.getColumns());
- }
- // Read from table was selected.
- return tx.partitionRead(
- op.getPartitionOptions(), op.getTable(), op.getKeySet(),
op.getColumns());
- }
-
- private List<Partition> executeWithPriority(
- ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority
rpcPriority) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(
- op.getPartitionOptions(), op.getQuery(),
Options.priority(rpcPriority));
+ ReadOperation op = c.element();
+
+ // While this creates a ServiceCallMetric for every input element, in
reality, the number
+ // of input elements will either be very few (normally 1!), or they will
differ and
+ // need different metrics.
+ ServiceCallMetric metric =
ReadAll.buildServiceCallMetricForReadOp(config, op);
+
+ List<Partition> partitions;
+ try {
Review Comment:
Is "executeWithoutPriority" path not needed anymore ?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -377,6 +382,12 @@ public class SpannerIO {
// Multiple of mutation size to use to gather and sort mutations
private static final int DEFAULT_GROUPING_FACTOR = 1000;
+ // Size of caches for read/write ServiceCallMetric objects .
+ // This is a reasonable limit, as for reads, each worker will process very
few different table
+ // read requests, and for writes, batching will ensure that write operations
for the same
+ // table occur at
+ public static final int METRICS_CACHE_SIZE = 100;
Review Comment:
Can this be package private ?
Issue Time Tracking
-------------------
Worklog Id: (was: 772191)
Remaining Estimate: 37h 10m (was: 37h 20m)
Time Spent: 2h 50m (was: 2h 40m)
> Incorrect Spanner IO Request Count metrics
> ------------------------------------------
>
> Key: BEAM-14121
> URL: https://issues.apache.org/jira/browse/BEAM-14121
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Affects Versions: 2.34.0
> Reporter: Niel Markwick
> Assignee: Niel Markwick
> Priority: P2
> Labels: google-cloud-spanner
> Original Estimate: 40h
> Time Spent: 2h 50m
> Remaining Estimate: 37h 10m
>
> IO request count metrics calculated incorrectly for GCP Spanner
>
> Resource ID is formulated incorrectly
> *Spanner Table:*
> {{//spanner.googleapis.com/projects/\{projectId}/{*}topics{*}/\{databaseId}/tables/\{tableId}}}
> should be
> {{//spanner.googleapis.com/projects/\{projectId}/instances/\{instanceId}/databases/\{databaseId}/tables/\{tableId}}}
> and is populated incorrectly – instance ID is used in place of tableID
> Spanner SQL Query:
> {{//spanner.googleapis.com/projects/\{projectId}/queries/\{queryName} }}
> {{should be}}
> {{{}//spanner.googleapis.com/projects/\{projectId}/{}}}{{{}instances/\{instanceId}/queries{}}}{{{}/\{queryName}
> {}}}
> and queryName is nullable which cause issued downstream
> this is not actually populated at all - queries are logged as reads on an
> instance.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)