[ 
https://issues.apache.org/jira/browse/BEAM-7495?focusedWorklogId=293472&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-293472
 ]

ASF GitHub Bot logged work on BEAM-7495:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Aug/19 23:09
            Start Date: 12/Aug/19 23:09
    Worklog Time Spent: 10m 
      Work Description: aryann commented on pull request #9156: [BEAM-7495] Add 
fine-grained progress reporting
URL: https://github.com/apache/beam/pull/9156#discussion_r313165358
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
 ##########
 @@ -210,20 +219,50 @@ private synchronized boolean readNextRecord() throws 
IOException {
           return false;
         }
 
-        // N.B.: For simplicity, we update fractionConsumed once a new 
response is fetched, not
-        // when we reach the end of the current response. In practice, this 
choice is not
-        // consequential.
-        fractionConsumed = fractionConsumedFromLastResponse;
-        ReadRowsResponse nextResponse = responseIterator.next();
+        fractionConsumedFromPreviousResponse = 
fractionConsumedFromCurrentResponse;
+        ReadRowsResponse currentResponse = responseIterator.next();
         decoder =
             DecoderFactory.get()
                 .binaryDecoder(
-                    
nextResponse.getAvroRows().getSerializedBinaryRows().toByteArray(), decoder);
-        fractionConsumedFromLastResponse = getFractionConsumed(nextResponse);
+                    
currentResponse.getAvroRows().getSerializedBinaryRows().toByteArray(), decoder);
+
+        // Since we now have a new response, reset the row counter for the 
current response.
+        rowsReadFromCurrentResponse = 0L;
+
+        totalRowCountFromCurrentResponse = 
currentResponse.getAvroRows().getRowCount();
+        fractionConsumedFromCurrentResponse = 
getFractionConsumed(currentResponse);
+
+        Preconditions.checkArgument(
+            totalRowCountFromCurrentResponse > 0L,
+            "Row count from current response (%s) must be greater than one.",
+            totalRowCountFromCurrentResponse);
+        Preconditions.checkArgument(
+            0f <= fractionConsumedFromCurrentResponse && 
fractionConsumedFromCurrentResponse <= 1f,
+            "Fraction consumed from current response (%s) is not in the range 
[0.0, 1.0].",
+            fractionConsumedFromCurrentResponse);
+        Preconditions.checkArgument(
+            fractionConsumedFromPreviousResponse < 
fractionConsumedFromCurrentResponse,
+            "Fraction consumed from previous response (%s) is not less than 
fraction consumed "
+                + "from current response (%s).",
+            fractionConsumedFromPreviousResponse,
+            fractionConsumedFromCurrentResponse);
       }
 
       record = datumReader.read(record, decoder);
       current = parseFn.apply(new SchemaAndRecord(record, tableSchema));
+
+      // Updates the fraction consumed value. This value is calculated by 
summing the fraction
+      // consumed value from the previous server response (or zero if we're 
consuming the first
+      // response) and by interpolating the fractional value in the current 
response based on how
+      // many rows have been consumed.
+      rowsReadFromCurrentResponse++;
+      fractionConsumed =
+          fractionConsumedFromPreviousResponse
+              + (fractionConsumedFromCurrentResponse - 
fractionConsumedFromPreviousResponse)
 
 Review comment:
   > So here, you compute the fraction consumed by adding to the previous 
fraction consumed. I guess this is where the bug was ? (sorry, don't have a 
clear diff due to the fixup).
   
   This section has not changed. There was never a bug here.
   
   > 
   > Why do we calculate fraction consuming by adding to the previous fraction 
consumed ? 
   
   We are not adding to the previous fractionConsumed. We are adding to 
fractionConsumedFromPreviousResponse, which is the value returned from the last 
response. This value means how much progress you have made assuming you have 
consumed all rows in that response. If you have a new response and you have 
consumed zero rows then your progress == fractionConsumedFromPreviousResponse. 
Otherwise, it's an interpolation between fractionConsumedFromPreviousResponse 
and fractionConsumedFromCurrentResponse.
   
   > Previous response will be off in case of a split (as you probably found 
out). Won't it be more accurate to determine the fraction consumed by always 
considering current values ? ("total consumed so far" / current size). Does the 
API limit this calculation somehow ? (if so please add a clear comment about 
that).
   
   The calculation will not be off in the case of a split. That was the bug 
that I addressed in the last commit. I probably should not have amended the 
commit, but added a separate commit. Apologies for the lack of proper diffs. 
The fix is on line 365:
   
   fractionConsumedFromCurrentResponse = fractionConsumed;
   
   On the next call to readNextRecord(), fractionConsumedFromCurrentResponse 
becomes fractionConsumedFromPreviousResponse. Using the last fractionConsumed 
value for fractionConsumedFromPreviousResponse is safe because the following 
response's consumed value will be larger (the denominator is now smaller even 
if the internal offset, i.e., numerator, is the same), so we can safely 
interpolate between those two values. We have an internal end-to-end test that 
tests this.
   
   > 
   > It's hard for me to figure out whether this is correct or not without 
knowing what the value returned by API call in getFractionConsumed() below. So 
please add a comment there as well and if possible and add a link to the a BQ 
public API doc that gives a description about the value returned from that call.
   
   The value is in the range [0.0, 1.0]. Does that answer your question?
   
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 293472)
            Time Spent: 12h 20m  (was: 12h 10m)
    Remaining Estimate: 491h 40m  (was: 491h 50m)

> Add support for dynamic worker re-balancing when reading BigQuery data using 
> Cloud Dataflow
> -------------------------------------------------------------------------------------------
>
>                 Key: BEAM-7495
>                 URL: https://issues.apache.org/jira/browse/BEAM-7495
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp
>            Reporter: Aryan Naraghi
>            Assignee: Aryan Naraghi
>            Priority: Major
>   Original Estimate: 504h
>          Time Spent: 12h 20m
>  Remaining Estimate: 491h 40m
>
> Currently, the BigQuery connector for reading data using the BigQuery Storage 
> API does not support any of the facilities on the source for Dataflow to 
> split streams.
>  
> On the server side, the BigQuery Storage API supports splitting streams at a 
> fraction. By adding support to the connector, we enable Dataflow to split 
> streams, which unlocks dynamic worker re-balancing.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to