nielm commented on code in PR #17335:
URL: https://github.com/apache/beam/pull/17335#discussion_r884734522
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java:
##########
@@ -114,75 +132,62 @@ public void teardown() throws Exception {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
- BatchReadOnlyTransaction context =
+ BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
- for (Partition p : execute(c.element(), context)) {
- c.output(p);
- }
- }
-
- private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction
tx) {
- if (config.getRpcPriority() != null && config.getRpcPriority().get() !=
null) {
- return executeWithPriority(op, tx, config.getRpcPriority().get());
- } else {
- return executeWithoutPriority(op, tx);
- }
- }
-
- private List<Partition> executeWithoutPriority(ReadOperation op,
BatchReadOnlyTransaction tx) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
- }
- // Read with index was selected.
- if (op.getIndex() != null) {
- return tx.partitionReadUsingIndex(
- op.getPartitionOptions(),
- op.getTable(),
- op.getIndex(),
- op.getKeySet(),
- op.getColumns());
- }
- // Read from table was selected.
- return tx.partitionRead(
- op.getPartitionOptions(), op.getTable(), op.getKeySet(),
op.getColumns());
- }
-
- private List<Partition> executeWithPriority(
- ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority
rpcPriority) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(
- op.getPartitionOptions(), op.getQuery(),
Options.priority(rpcPriority));
+ ReadOperation op = c.element();
+
+ // While this creates a ServiceCallMetric for every input element, in
reality, the number
+ // of input elements will either be very few (normally 1!), or they will
differ and
+ // need different metrics.
+ ServiceCallMetric metric =
ReadAll.buildServiceCallMetricForReadOp(config, op);
+
+ List<Partition> partitions;
+ try {
+ if (op.getQuery() != null) {
+ // Query was selected.
+ partitions =
+ batchTx.partitionQuery(
+ op.getPartitionOptions(),
+ op.getQuery(),
+ Options.priority(config.getRpcPriority().get()));
+ } else if (op.getIndex() != null) {
+ // Read with index was selected.
+ partitions =
+ batchTx.partitionReadUsingIndex(
+ op.getPartitionOptions(),
+ op.getTable(),
+ op.getIndex(),
+ op.getKeySet(),
+ op.getColumns(),
+ Options.priority(config.getRpcPriority().get()));
Review Comment:
see above - config.getRpcPriority can no longer be null.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java:
##########
@@ -114,75 +132,62 @@ public void teardown() throws Exception {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
- BatchReadOnlyTransaction context =
+ BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
- for (Partition p : execute(c.element(), context)) {
- c.output(p);
- }
- }
-
- private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction
tx) {
- if (config.getRpcPriority() != null && config.getRpcPriority().get() !=
null) {
- return executeWithPriority(op, tx, config.getRpcPriority().get());
- } else {
- return executeWithoutPriority(op, tx);
- }
- }
-
- private List<Partition> executeWithoutPriority(ReadOperation op,
BatchReadOnlyTransaction tx) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
- }
- // Read with index was selected.
- if (op.getIndex() != null) {
- return tx.partitionReadUsingIndex(
- op.getPartitionOptions(),
- op.getTable(),
- op.getIndex(),
- op.getKeySet(),
- op.getColumns());
- }
- // Read from table was selected.
- return tx.partitionRead(
- op.getPartitionOptions(), op.getTable(), op.getKeySet(),
op.getColumns());
- }
-
- private List<Partition> executeWithPriority(
- ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority
rpcPriority) {
- // Query was selected.
- if (op.getQuery() != null) {
- return tx.partitionQuery(
- op.getPartitionOptions(), op.getQuery(),
Options.priority(rpcPriority));
+ ReadOperation op = c.element();
+
+ // While this creates a ServiceCallMetric for every input element, in
reality, the number
+ // of input elements will either be very few (normally 1!), or they will
differ and
+ // need different metrics.
+ ServiceCallMetric metric =
ReadAll.buildServiceCallMetricForReadOp(config, op);
+
+ List<Partition> partitions;
+ try {
Review Comment:
Normally config.getRpcPriority() should never be null - a default is set in
SpannerConfig.create().
However, I realise it is possible to pass null as a value (or as a null
ValueProvider) to SpannerConfig.setRpcPriority().
This (IMHO) is incorrect. There is a Preconditions.checkNotNull above to
ensure that getRpcPriority is non-null, and I have also added this same check
to SpannerConfig.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]