[
https://issues.apache.org/jira/browse/BEAM-7495?focusedWorklogId=293472&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-293472
]
ASF GitHub Bot logged work on BEAM-7495:
----------------------------------------
Author: ASF GitHub Bot
Created on: 12/Aug/19 23:09
Start Date: 12/Aug/19 23:09
Worklog Time Spent: 10m
Work Description: aryann commented on pull request #9156: [BEAM-7495] Add
fine-grained progress reporting
URL: https://github.com/apache/beam/pull/9156#discussion_r313165358
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
##########
@@ -210,20 +219,50 @@ private synchronized boolean readNextRecord() throws
IOException {
return false;
}
- // N.B.: For simplicity, we update fractionConsumed once a new
response is fetched, not
- // when we reach the end of the current response. In practice, this
choice is not
- // consequential.
- fractionConsumed = fractionConsumedFromLastResponse;
- ReadRowsResponse nextResponse = responseIterator.next();
+ fractionConsumedFromPreviousResponse =
fractionConsumedFromCurrentResponse;
+ ReadRowsResponse currentResponse = responseIterator.next();
decoder =
DecoderFactory.get()
.binaryDecoder(
-
nextResponse.getAvroRows().getSerializedBinaryRows().toByteArray(), decoder);
- fractionConsumedFromLastResponse = getFractionConsumed(nextResponse);
+
currentResponse.getAvroRows().getSerializedBinaryRows().toByteArray(), decoder);
+
+ // Since we now have a new response, reset the row counter for the
current response.
+ rowsReadFromCurrentResponse = 0L;
+
+ totalRowCountFromCurrentResponse =
currentResponse.getAvroRows().getRowCount();
+ fractionConsumedFromCurrentResponse =
getFractionConsumed(currentResponse);
+
+ Preconditions.checkArgument(
+ totalRowCountFromCurrentResponse > 0L,
+ "Row count from current response (%s) must be greater than one.",
+ totalRowCountFromCurrentResponse);
+ Preconditions.checkArgument(
+ 0f <= fractionConsumedFromCurrentResponse &&
fractionConsumedFromCurrentResponse <= 1f,
+ "Fraction consumed from current response (%s) is not in the range
[0.0, 1.0].",
+ fractionConsumedFromCurrentResponse);
+ Preconditions.checkArgument(
+ fractionConsumedFromPreviousResponse <
fractionConsumedFromCurrentResponse,
+ "Fraction consumed from previous response (%s) is not less than
fraction consumed "
+ + "from current response (%s).",
+ fractionConsumedFromPreviousResponse,
+ fractionConsumedFromCurrentResponse);
}
record = datumReader.read(record, decoder);
current = parseFn.apply(new SchemaAndRecord(record, tableSchema));
+
+ // Updates the fraction consumed value. This value is calculated by
summing the fraction
+ // consumed value from the previous server response (or zero if we're
consuming the first
+ // response) and by interpolating the fractional value in the current
response based on how
+ // many rows have been consumed.
+ rowsReadFromCurrentResponse++;
+ fractionConsumed =
+ fractionConsumedFromPreviousResponse
+ + (fractionConsumedFromCurrentResponse -
fractionConsumedFromPreviousResponse)
Review comment:
> So here, you compute the fraction consumed by adding to the previous
fraction consumed. I guess this is where the bug was ? (sorry, don't have a
clear diff due to the fixup).
This section has not changed. There was never a bug here.
>
> Why do we calculate fraction consuming by adding to the previous fraction
consumed ?
We are not adding to the previous fractionConsumed. We are adding to
fractionConsumedFromPreviousResponse, which is the value returned from the last
response. This value means how much progress you have made assuming you have
consumed all rows in that response. If you have a new response and you have
consumed zero rows then your progress == fractionConsumedFromPreviousResponse.
Otherwise, it's an interpolation between fractionConsumedFromPreviousResponse
and fractionConsumedFromCurrentResponse.
> Previous response will be off in case of a split (as you probably found
out). Won't it be more accurate to determine the fraction consumed by always
considering current values ? ("total consumed so far" / current size). Does the
API limit this calculation somehow ? (if so please add a clear comment about
that).
The calculation will not be off in the case of a split. That was the bug
that I addressed in the last commit. I probably should not have amended the
commit, but added a separate commit. Apologies for the lack of proper diffs.
The fix is on line 365:
fractionConsumedFromCurrentResponse = fractionConsumed;
On the next call to readNextRecord(), fractionConsumedFromCurrentResponse
becomes fractionConsumedFromPreviousResponse. Using the last fractionConsumed
value for fractionConsumedFromPreviousResponse is safe because the following
response's consumed value will be larger (the denominator is now smaller even
if the internal offset, i.e., numerator, is the same), so we can safely
interpolate between those two values. We have an internal end-to-end test that
tests this.
>
> It's hard for me to figure out whether this is correct or not without
knowing what the value returned by API call in getFractionConsumed() below. So
please add a comment there as well and if possible and add a link to the a BQ
public API doc that gives a description about the value returned from that call.
The value is in the range [0.0, 1.0]. Does that answer your question?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 293472)
Time Spent: 12h 20m (was: 12h 10m)
Remaining Estimate: 491h 40m (was: 491h 50m)
> Add support for dynamic worker re-balancing when reading BigQuery data using
> Cloud Dataflow
> -------------------------------------------------------------------------------------------
>
> Key: BEAM-7495
> URL: https://issues.apache.org/jira/browse/BEAM-7495
> Project: Beam
> Issue Type: New Feature
> Components: io-java-gcp
> Reporter: Aryan Naraghi
> Assignee: Aryan Naraghi
> Priority: Major
> Original Estimate: 504h
> Time Spent: 12h 20m
> Remaining Estimate: 491h 40m
>
> Currently, the BigQuery connector for reading data using the BigQuery Storage
> API does not support any of the facilities on the source for Dataflow to
> split streams.
>
> On the server side, the BigQuery Storage API supports splitting streams at a
> fraction. By adding support to the connector, we enable Dataflow to split
> streams, which unlocks dynamic worker re-balancing.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)