[ 
https://issues.apache.org/jira/browse/BEAM-7495?focusedWorklogId=260683&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-260683
 ]

ASF GitHub Bot logged work on BEAM-7495:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Jun/19 19:43
            Start Date: 14/Jun/19 19:43
    Worklog Time Spent: 10m 
      Work Description: aryann commented on pull request #8832: [BEAM-7495] Add 
dynamic worker rebalancing to BigQuery Storage
URL: https://github.com/apache/beam/pull/8832#discussion_r293950722
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
 ##########
 @@ -215,23 +223,101 @@ public T getCurrent() throws NoSuchElementException {
     }
 
     @Override
-    protected long getCurrentOffset() throws NoSuchElementException {
-      return currentOffset;
-    }
-
-    @Override
-    public void close() {
+    public synchronized void close() {
       storageClient.close();
     }
 
     @Override
     public synchronized BigQueryStorageStreamSource<T> getCurrentSource() {
-      return (BigQueryStorageStreamSource<T>) super.getCurrentSource();
+      return source;
     }
 
     @Override
-    public boolean allowsDynamicSplitting() {
-      return false;
+    public BoundedSource<T> splitAtFraction(double fraction) {
+      Metrics.counter(BigQueryStorageStreamReader.class, 
"split-at-fraction-calls").inc();
+      LOGGER.info(
+          "Received split request for stream '{}' at fraction {}.",
+          source.stream.getName(),
+          fraction);
+
+      SplitReadStreamRequest splitRequest =
+          SplitReadStreamRequest.newBuilder()
+              .setOriginalStream(source.stream)
+              // TODO(aryann): Once we rebuild the generated client code, we 
should change this to
+              // use setFraction().
+              .setUnknownFields(
+                  UnknownFieldSet.newBuilder()
+                      .addField(
+                          2,
+                          UnknownFieldSet.Field.newBuilder()
+                              
.addFixed32(java.lang.Float.floatToIntBits((float) fraction))
+                              .build())
+                      .build())
+              .build();
+      SplitReadStreamResponse splitResponse = 
storageClient.splitReadStream(splitRequest);
+
+      if (!splitResponse.hasPrimaryStream() || 
!splitResponse.hasRemainderStream()) {
+        // No more splits are possible!
+        Metrics.counter(
+                BigQueryStorageStreamReader.class,
+                "split-at-fraction-calls-failed-due-to-impossible-split-point")
+            .inc();
+        LOGGER.info("Stream '{}' cannot be split at {}.", 
source.stream.getName(), fraction);
+        return null;
+      }
+
+      // We may be able to split this source. Before continuing, we pause the 
reader thread and
+      // replace its current source with the primary stream iff the reader has 
not moved past
+      // the split point.
+      synchronized (this) {
+        BigQueryServerStream<ReadRowsResponse> newResponseStream;
+        Iterator<ReadRowsResponse> newResponseIterator;
+        try {
+          newResponseStream =
+              storageClient.readRows(
+                  ReadRowsRequest.newBuilder()
+                      .setReadPosition(
+                          StreamPosition.newBuilder()
+                              .setStream(splitResponse.getPrimaryStream())
+                              .setOffset(currentOffset + 1))
+                      .build());
+          newResponseIterator = newResponseStream.iterator();
+          newResponseIterator.hasNext();
+        } catch (FailedPreconditionException e) {
+          // The current source has already moved past the split point, so 
this split attempt
+          // is unsuccessful.
+          Metrics.counter(
+                  BigQueryStorageStreamReader.class,
+                  "split-at-fraction-calls-failed-due-to-bad-split-point")
+              .inc();
+          LOGGER.info(
+              "Split of stream '{}' abandoned because the primary stream is to 
the left of "
+                  + "the split fraction {}.",
+              source.stream.getName(),
+              fraction);
+          return null;
+        } catch (Exception e) {
+          Metrics.counter(
+                  BigQueryStorageStreamReader.class,
+                  "split-at-fraction-calls-failed-due-to-other-reasons")
+              .inc();
+          throw e;
+        }
+
+        // Cancels the parent stream before replacing it with the primary 
stream.
+        responseStream.cancel();
+
+        currentOffset++;
+        source = source.fromExisting(splitResponse.getPrimaryStream());
 
 Review comment:
   The offset is not a property of the source or stream resource. It's a 
property of the ReadRows request/streaming RPC. Above we create a new 
client-server stream that begins with the right offset (responseStream) and we 
replace the current one with it.
   
   However, we also save a reference to the BigQueryStorageStreamSource 
pointing to the primary stream, and a client can get access to that through 
BigQueryStorageStreamReader#getCurrentSource. Does that source need to start at 
whatever offset the parent reader was at when split happened OR does it have to 
start at 0 (i.e., retaining the property of starting where the parent stream 
initially started)?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 260683)
            Time Spent: 1h 40m  (was: 1.5h)
    Remaining Estimate: 502h 20m  (was: 502.5h)

> Add support for dynamic worker re-balancing when reading BigQuery data using 
> Cloud Dataflow
> -------------------------------------------------------------------------------------------
>
>                 Key: BEAM-7495
>                 URL: https://issues.apache.org/jira/browse/BEAM-7495
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp
>            Reporter: Aryan Naraghi
>            Assignee: Aryan Naraghi
>            Priority: Major
>   Original Estimate: 504h
>          Time Spent: 1h 40m
>  Remaining Estimate: 502h 20m
>
> Currently, the BigQuery connector for reading data using the BigQuery Storage 
> API does not support any of the facilities on the source for Dataflow to 
> split streams.
>  
> On the server side, the BigQuery Storage API supports splitting streams at a 
> fraction. By adding support to the connector, we enable Dataflow to split 
> streams, which unlocks dynamic worker re-balancing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to