chamikaramj commented on code in PR #17335:
URL: https://github.com/apache/beam/pull/17335#discussion_r876468653
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -377,6 +382,12 @@ public class SpannerIO {
// Multiple of mutation size to use to gather and sort mutations
private static final int DEFAULT_GROUPING_FACTOR = 1000;
+ // Size of caches for read/write ServiceCallMetric objects .
+ // This is a reasonable limit, as for reads, each worker will process very
few different table
+ // read requests, and for writes, batching will ensure that write operations
for the same
+ // table occur at
Review Comment:
Incomplete comment.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java:
##########
@@ -114,75 +132,62 @@ public void teardown() throws Exception {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
- BatchReadOnlyTransaction context =
+ BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
- for (Partition p : execute(c.element(), context)) {
- c.output(p);
- }
- }
-
- private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction
tx) {
- if (config.getRpcPriority() != null && config.getRpcPriority().get() !=
null) {
- return executeWithPriority(op, tx, config.getRpcPriority().get());
- } else {
- return executeWithoutPriority(op, tx);
- }
- }
-
- private List<Partition> executeWithoutPriority(ReadOperation op,
BatchReadOnlyTransaction tx) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
- }
- // Read with index was selected.
- if (op.getIndex() != null) {
- return tx.partitionReadUsingIndex(
- op.getPartitionOptions(),
- op.getTable(),
- op.getIndex(),
- op.getKeySet(),
- op.getColumns());
- }
- // Read from table was selected.
- return tx.partitionRead(
- op.getPartitionOptions(), op.getTable(), op.getKeySet(),
op.getColumns());
- }
-
- private List<Partition> executeWithPriority(
- ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority
rpcPriority) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(
- op.getPartitionOptions(), op.getQuery(),
Options.priority(rpcPriority));
+ ReadOperation op = c.element();
+
+ // While this creates a ServiceCallMetric for every input element, in
reality, the number
+ // of input elements will either be very few (normally 1!), or they will
differ and
+ // need different metrics.
+ ServiceCallMetric metric =
ReadAll.buildServiceCallMetricForReadOp(config, op);
+
+ List<Partition> partitions;
+ try {
+ if (op.getQuery() != null) {
+ // Query was selected.
+ partitions =
+ batchTx.partitionQuery(
+ op.getPartitionOptions(),
+ op.getQuery(),
+ Options.priority(config.getRpcPriority().get()));
+ } else if (op.getIndex() != null) {
+ // Read with index was selected.
+ partitions =
+ batchTx.partitionReadUsingIndex(
+ op.getPartitionOptions(),
+ op.getTable(),
+ op.getIndex(),
+ op.getKeySet(),
+ op.getColumns(),
+ Options.priority(config.getRpcPriority().get()));
Review Comment:
Previously this was called without a priority if config.getRpcPriority() ==
null or config.getRpcPriority().get() == null.
Are we sure that this will not introduce a behavior change ?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java:
##########
@@ -114,75 +132,62 @@ public void teardown() throws Exception {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
- BatchReadOnlyTransaction context =
+ BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
- for (Partition p : execute(c.element(), context)) {
- c.output(p);
- }
- }
-
- private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction
tx) {
- if (config.getRpcPriority() != null && config.getRpcPriority().get() !=
null) {
- return executeWithPriority(op, tx, config.getRpcPriority().get());
- } else {
- return executeWithoutPriority(op, tx);
- }
- }
-
- private List<Partition> executeWithoutPriority(ReadOperation op,
BatchReadOnlyTransaction tx) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
- }
- // Read with index was selected.
- if (op.getIndex() != null) {
- return tx.partitionReadUsingIndex(
- op.getPartitionOptions(),
- op.getTable(),
- op.getIndex(),
- op.getKeySet(),
- op.getColumns());
- }
- // Read from table was selected.
- return tx.partitionRead(
- op.getPartitionOptions(), op.getTable(), op.getKeySet(),
op.getColumns());
- }
-
- private List<Partition> executeWithPriority(
- ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority
rpcPriority) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(
- op.getPartitionOptions(), op.getQuery(),
Options.priority(rpcPriority));
+ ReadOperation op = c.element();
+
+ // While this creates a ServiceCallMetric for every input element, in
reality, the number
+ // of input elements will either be very few (normally 1!), or they will
differ and
+ // need different metrics.
+ ServiceCallMetric metric =
ReadAll.buildServiceCallMetricForReadOp(config, op);
+
+ List<Partition> partitions;
+ try {
+ if (op.getQuery() != null) {
+ // Query was selected.
+ partitions =
+ batchTx.partitionQuery(
+ op.getPartitionOptions(),
+ op.getQuery(),
+ Options.priority(config.getRpcPriority().get()));
+ } else if (op.getIndex() != null) {
+ // Read with index was selected.
+ partitions =
+ batchTx.partitionReadUsingIndex(
+ op.getPartitionOptions(),
+ op.getTable(),
+ op.getIndex(),
+ op.getKeySet(),
+ op.getColumns(),
+ Options.priority(config.getRpcPriority().get()));
+ } else {
+ // Read from table was selected.
+ partitions =
+ batchTx.partitionRead(
+ op.getPartitionOptions(),
+ op.getTable(),
+ op.getKeySet(),
+ op.getColumns(),
+ Options.priority(config.getRpcPriority().get()));
Review Comment:
Ditto.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java:
##########
@@ -114,75 +132,62 @@ public void teardown() throws Exception {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
- BatchReadOnlyTransaction context =
+ BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
- for (Partition p : execute(c.element(), context)) {
- c.output(p);
- }
- }
-
- private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction
tx) {
- if (config.getRpcPriority() != null && config.getRpcPriority().get() !=
null) {
- return executeWithPriority(op, tx, config.getRpcPriority().get());
- } else {
- return executeWithoutPriority(op, tx);
- }
- }
-
- private List<Partition> executeWithoutPriority(ReadOperation op,
BatchReadOnlyTransaction tx) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
- }
- // Read with index was selected.
- if (op.getIndex() != null) {
- return tx.partitionReadUsingIndex(
- op.getPartitionOptions(),
- op.getTable(),
- op.getIndex(),
- op.getKeySet(),
- op.getColumns());
- }
- // Read from table was selected.
- return tx.partitionRead(
- op.getPartitionOptions(), op.getTable(), op.getKeySet(),
op.getColumns());
- }
-
- private List<Partition> executeWithPriority(
- ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority
rpcPriority) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(
- op.getPartitionOptions(), op.getQuery(),
Options.priority(rpcPriority));
+ ReadOperation op = c.element();
+
+ // While this creates a ServiceCallMetric for every input element, in
reality, the number
+ // of input elements will either be very few (normally 1!), or they will
differ and
+ // need different metrics.
+ ServiceCallMetric metric =
ReadAll.buildServiceCallMetricForReadOp(config, op);
+
+ List<Partition> partitions;
+ try {
Review Comment:
Is "executeWithoutPriority" path not needed anymore ?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -377,6 +382,12 @@ public class SpannerIO {
// Multiple of mutation size to use to gather and sort mutations
private static final int DEFAULT_GROUPING_FACTOR = 1000;
+ // Size of caches for read/write ServiceCallMetric objects .
+ // This is a reasonable limit, as for reads, each worker will process very
few different table
+ // read requests, and for writes, batching will ensure that write operations
for the same
+ // table occur at
+ public static final int METRICS_CACHE_SIZE = 100;
Review Comment:
Can this be package private ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]