This is an automated email from the ASF dual-hosted git repository.

mikhail pushed a commit to branch release-2.17.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.17.0 by this push:
     new 7ffacbd  Fix a bug related to zero-row responses
     new 0e0b8db  Merge pull request #10168 from kanterov/cherry-pick-beam-8504
7ffacbd is described below

commit 7ffacbd0b83fb1121337e222a46217ea49150c05
Author: Aryan Naraghi <[email protected]>
AuthorDate: Mon Nov 4 15:03:26 2019 -0800

    Fix a bug related to zero-row responses
    
    In some rare cases, the server can included zero rows in the response 
messages,
    leading to a zero count of rows and a progress report that does not
    increase. This change relaxes some of our preconditions.
---
 .../sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java   | 14 +++++++-------
 .../sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java     |  3 +++
 2 files changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
index d9c3888..965e3b4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
@@ -233,19 +233,19 @@ public class BigQueryStorageStreamSource<T> extends 
BoundedSource<T> {
         fractionConsumedFromCurrentResponse = 
getFractionConsumed(currentResponse);
 
         Preconditions.checkArgument(
-            totalRowCountFromCurrentResponse > 0L,
-            "Row count from current response (%s) must be greater than one.",
+            totalRowCountFromCurrentResponse >= 0L,
+            "Row count from current response (%s) must be greater than or 
equal to zero.",
             totalRowCountFromCurrentResponse);
         Preconditions.checkArgument(
             0f <= fractionConsumedFromCurrentResponse && 
fractionConsumedFromCurrentResponse <= 1f,
             "Fraction consumed from current response (%s) is not in the range 
[0.0, 1.0].",
             fractionConsumedFromCurrentResponse);
         Preconditions.checkArgument(
-            fractionConsumedFromPreviousResponse < 
fractionConsumedFromCurrentResponse,
-            "Fraction consumed from previous response (%s) is not less than 
fraction consumed "
-                + "from current response (%s).",
-            fractionConsumedFromPreviousResponse,
-            fractionConsumedFromCurrentResponse);
+            fractionConsumedFromPreviousResponse <= 
fractionConsumedFromCurrentResponse,
+            "Fraction consumed from the current response (%s) has to be larger 
than or equal to "
+                + "the fraction consumed from the previous response (%s).",
+            fractionConsumedFromCurrentResponse,
+            fractionConsumedFromPreviousResponse);
       }
 
       record = datumReader.read(record, decoder);
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
index e6f8eeb..fe8b4d9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
@@ -861,6 +861,9 @@ public class BigQueryIOStorageReadTest {
             // N.B.: All floating point numbers used in this test can be 
represented without
             // a loss of precision.
             createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.250),
+            // Some responses may contain zero results, so we must ensure that 
we can are resilient
+            // to such responses.
+            createResponse(AVRO_SCHEMA, Lists.newArrayList(), 0.250),
             createResponse(AVRO_SCHEMA, records.subList(2, 4), 0.500),
             createResponse(AVRO_SCHEMA, records.subList(4, 7), 0.875));
 

Reply via email to