boyuanzz commented on a change in pull request #14811:
URL: https://github.com/apache/beam/pull/14811#discussion_r648688115



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java
##########
@@ -73,18 +74,16 @@ public static BatchSpannerRead create(
         .apply(
             "Generate Partitions",
             ParDo.of(new GeneratePartitionsFn(getSpannerConfig(), 
txView)).withSideInputs(txView))
-        .apply("Shuffle partitions", Reshuffle.<Partition>viaRandomKey())
         .apply(
             "Read from Partitions",
             ParDo.of(new ReadFromPartitionFn(getSpannerConfig(), 
txView)).withSideInputs(txView));
   }
 
   @VisibleForTesting
-  static class GeneratePartitionsFn extends DoFn<ReadOperation, Partition> {
+  static class GeneratePartitionsFn extends DoFn<ReadOperation, 
List<Partition>> {

Review comment:
       We can remove the GeneratePartitionsFn as well by using 
@GetInitialRestriction and @SplitRestriction from y our Splittable DoFn.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java
##########
@@ -157,19 +154,29 @@ public void teardown() throws Exception {
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
+    public void processElement(ProcessContext c, 
RestrictionTracker<OffsetRange, Long> tracker)
+        throws Exception {
       Transaction tx = c.sideInput(txView);
 
       BatchReadOnlyTransaction batchTx =
           
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
 
-      Partition p = c.element();
-      try (ResultSet resultSet = batchTx.execute(p)) {
-        while (resultSet.next()) {
-          Struct s = resultSet.getCurrentRowAsStruct();
-          c.output(s);
+      List<Partition> partitions = c.element();
+      for (int i = (int) tracker.currentRestriction().getFrom(); i < 
partitions.size(); i++) {

Review comment:
       `i < (int) tracker.currentRestriction().getEnd()`?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java
##########
@@ -157,19 +154,29 @@ public void teardown() throws Exception {
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
+    public void processElement(ProcessContext c, 
RestrictionTracker<OffsetRange, Long> tracker)
+        throws Exception {
       Transaction tx = c.sideInput(txView);
 
       BatchReadOnlyTransaction batchTx =
           
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
 
-      Partition p = c.element();
-      try (ResultSet resultSet = batchTx.execute(p)) {
-        while (resultSet.next()) {
-          Struct s = resultSet.getCurrentRowAsStruct();
-          c.output(s);
+      List<Partition> partitions = c.element();
+      for (int i = (int) tracker.currentRestriction().getFrom(); i < 
partitions.size(); i++) {
+        if (tracker.tryClaim(Long.valueOf(i))) {
+          try (ResultSet resultSet = batchTx.execute(partitions.get(i))) {
+            while (resultSet.next()) {
+              Struct s = resultSet.getCurrentRowAsStruct();
+              c.output(s);
+            }
+          }
         }
       }
     }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRange(@Element List<Partition> partitions) {
+      return new OffsetRange(0L, partitions.size());

Review comment:
       The OffsetsetRange is usually a range where number >= start && number < 
end

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java
##########
@@ -133,7 +130,7 @@ public void processElement(ProcessContext c) throws 
Exception {
     }
   }
 
-  private static class ReadFromPartitionFn extends DoFn<Partition, Struct> {
+  private static class ReadFromPartitionFn extends DoFn<List<Partition>, 
Struct> {

Review comment:
       You may want to mark it as `@BoundedPerElement`.
   
   And are we able to provide better size information here? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to