[ 
https://issues.apache.org/jira/browse/BEAM-7495?focusedWorklogId=289819&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-289819
 ]

ASF GitHub Bot logged work on BEAM-7495:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Aug/19 16:12
            Start Date: 06/Aug/19 16:12
    Worklog Time Spent: 10m 
      Work Description: aryann commented on pull request #9156: [BEAM-7495] Add 
fine-grained progress reporting
URL: https://github.com/apache/beam/pull/9156#discussion_r311150922
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
 ##########
 @@ -210,20 +219,50 @@ private synchronized boolean readNextRecord() throws 
IOException {
           return false;
         }
 
-        // N.B.: For simplicity, we update fractionConsumed once a new 
response is fetched, not
-        // when we reach the end of the current response. In practice, this 
choice is not
-        // consequential.
-        fractionConsumed = fractionConsumedFromLastResponse;
-        ReadRowsResponse nextResponse = responseIterator.next();
+        fractionConsumedFromPreviousResponse = 
fractionConsumedFromCurrentResponse;
+        ReadRowsResponse currentResponse = responseIterator.next();
         decoder =
             DecoderFactory.get()
                 .binaryDecoder(
-                    
nextResponse.getAvroRows().getSerializedBinaryRows().toByteArray(), decoder);
-        fractionConsumedFromLastResponse = getFractionConsumed(nextResponse);
+                    
currentResponse.getAvroRows().getSerializedBinaryRows().toByteArray(), decoder);
+
+        // Since we now have a new response, reset the row counter for the 
current response.
+        rowsReadFromCurrentResponse = 0L;
+
+        totalRowCountFromCurrentResponse = 
currentResponse.getAvroRows().getRowCount();
+        fractionConsumedFromCurrentResponse = 
getFractionConsumed(currentResponse);
+
+        Preconditions.checkArgument(
+            totalRowCountFromCurrentResponse > 0L,
+            "Row count from current response (%s) must be greater than one.",
+            totalRowCountFromCurrentResponse);
+        Preconditions.checkArgument(
+            0f <= fractionConsumedFromCurrentResponse && 
fractionConsumedFromCurrentResponse <= 1f,
+            "Fraction consumed from current response (%s) is not in the range 
[0.0, 1.0].",
+            fractionConsumedFromCurrentResponse);
+        Preconditions.checkArgument(
+            fractionConsumedFromPreviousResponse < 
fractionConsumedFromCurrentResponse,
+            "Fraction consumed from previous response (%s) is not less than 
fraction consumed "
+                + "from current response (%s).",
+            fractionConsumedFromPreviousResponse,
+            fractionConsumedFromCurrentResponse);
       }
 
       record = datumReader.read(record, decoder);
       current = parseFn.apply(new SchemaAndRecord(record, tableSchema));
+
+      // Updates the fraction consumed value. This value is calculated by 
summing the fraction
+      // consumed value from the previous server response (or zero if we're 
consuming the first
+      // response) and by interpolating the fractional value in the current 
response based on how
+      // many rows have been consumed.
+      rowsReadFromCurrentResponse++;
+      fractionConsumed =
 
 Review comment:
   I purposefully put this computation in readNextRecord(). Ken pointed this 
out in an earlier comment.
   
   This is clearer because (1) the computation is close to the code that 
updates the variables involved; and (2) moving this to getFractionConsumed() 
makes computing the 1.0 for end of stream a little trickier because you'll 
either need another field to signal the end of the stream or you fiddle with 
the existing vars so the computation leads to 1.0--both of these schemes make 
the code a little harder to follow than what we have now (i.e., 
"fractionConsumed = 1d").
   
   I don't think we should worry about whether this is efficient or not. This 
is a pretty cheap numerical computation. I suspect the AVRO parsing for each 
row is going to dominate the amount of time spent (... in the case where no I/O 
takes place).
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 289819)
            Time Spent: 10h 40m  (was: 10.5h)
    Remaining Estimate: 493h 20m  (was: 493.5h)

> Add support for dynamic worker re-balancing when reading BigQuery data using 
> Cloud Dataflow
> -------------------------------------------------------------------------------------------
>
>                 Key: BEAM-7495
>                 URL: https://issues.apache.org/jira/browse/BEAM-7495
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp
>            Reporter: Aryan Naraghi
>            Assignee: Aryan Naraghi
>            Priority: Major
>   Original Estimate: 504h
>          Time Spent: 10h 40m
>  Remaining Estimate: 493h 20m
>
> Currently, the BigQuery connector for reading data using the BigQuery Storage 
> API does not support any of the facilities on the source for Dataflow to 
> split streams.
>  
> On the server side, the BigQuery Storage API supports splitting streams at a 
> fraction. By adding support to the connector, we enable Dataflow to split 
> streams, which unlocks dynamic worker re-balancing.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to