This is an automated email from the ASF dual-hosted git repository.
mikhail pushed a commit to branch release-2.17.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.17.0 by this push:
new 7ffacbd Fix a bug related to zero-row responses
new 0e0b8db Merge pull request #10168 from kanterov/cherry-pick-beam-8504
7ffacbd is described below
commit 7ffacbd0b83fb1121337e222a46217ea49150c05
Author: Aryan Naraghi <[email protected]>
AuthorDate: Mon Nov 4 15:03:26 2019 -0800
Fix a bug related to zero-row responses
In some rare cases, the server can included zero rows in the response
messages,
leading to a zero count of rows and a progress report that does not
increase. This change relaxes some of our preconditions.
---
.../sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java | 14 +++++++-------
.../sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java | 3 +++
2 files changed, 10 insertions(+), 7 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
index d9c3888..965e3b4 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
@@ -233,19 +233,19 @@ public class BigQueryStorageStreamSource<T> extends
BoundedSource<T> {
fractionConsumedFromCurrentResponse =
getFractionConsumed(currentResponse);
Preconditions.checkArgument(
- totalRowCountFromCurrentResponse > 0L,
- "Row count from current response (%s) must be greater than one.",
+ totalRowCountFromCurrentResponse >= 0L,
+ "Row count from current response (%s) must be greater than or
equal to zero.",
totalRowCountFromCurrentResponse);
Preconditions.checkArgument(
0f <= fractionConsumedFromCurrentResponse &&
fractionConsumedFromCurrentResponse <= 1f,
"Fraction consumed from current response (%s) is not in the range
[0.0, 1.0].",
fractionConsumedFromCurrentResponse);
Preconditions.checkArgument(
- fractionConsumedFromPreviousResponse <
fractionConsumedFromCurrentResponse,
- "Fraction consumed from previous response (%s) is not less than
fraction consumed "
- + "from current response (%s).",
- fractionConsumedFromPreviousResponse,
- fractionConsumedFromCurrentResponse);
+ fractionConsumedFromPreviousResponse <=
fractionConsumedFromCurrentResponse,
+ "Fraction consumed from the current response (%s) has to be larger
than or equal to "
+ + "the fraction consumed from the previous response (%s).",
+ fractionConsumedFromCurrentResponse,
+ fractionConsumedFromPreviousResponse);
}
record = datumReader.read(record, decoder);
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
index e6f8eeb..fe8b4d9 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
@@ -861,6 +861,9 @@ public class BigQueryIOStorageReadTest {
// N.B.: All floating point numbers used in this test can be
represented without
// a loss of precision.
createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.250),
+ // Some responses may contain zero results, so we must ensure that
we can are resilient
+ // to such responses.
+ createResponse(AVRO_SCHEMA, Lists.newArrayList(), 0.250),
createResponse(AVRO_SCHEMA, records.subList(2, 4), 0.500),
createResponse(AVRO_SCHEMA, records.subList(4, 7), 0.875));