This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 10b6342c260 Fix assert message, fix streaming mode (#31540)
10b6342c260 is described below

commit 10b6342c26005d52963d410efc1a78e4bd03e888
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Fri Jun 7 17:32:31 2024 +0400

    Fix assert message, fix streaming mode (#31540)
---
 .../org/apache/beam/it/gcp/bigquery/BigQueryIOST.java | 19 ++++++++++++++++---
 1 file changed, 16 insertions(+), 3 deletions(-)

diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java
index a444449df8a..22ff94e293b 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java
@@ -296,6 +296,11 @@ public final class BigQueryIOST extends IOStressTestBase {
                 .withSchema(schema)
                 
.withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(tempLocation)));
 
+    String experiments =
+        configuration.writeMethod.equals(STORAGE_API_AT_LEAST_ONCE_METHOD)
+            ? GcpOptions.STREAMING_ENGINE_EXPERIMENT + 
",streaming_mode_at_least_once"
+            : GcpOptions.STREAMING_ENGINE_EXPERIMENT;
+
     PipelineLauncher.LaunchConfig options =
         PipelineLauncher.LaunchConfig.builder("write-bigquery")
             .setSdk(PipelineLauncher.Sdk.JAVA)
@@ -307,7 +312,7 @@ public final class BigQueryIOST extends IOStressTestBase {
                     .toString())
             .addParameter("numWorkers", 
String.valueOf(configuration.numWorkers))
             .addParameter("maxNumWorkers", 
String.valueOf(configuration.maxNumWorkers))
-            .addParameter("experiments", 
GcpOptions.STREAMING_ENGINE_EXPERIMENT)
+            .addParameter("experiments", experiments)
             .build();
 
     PipelineLauncher.LaunchInfo launchInfo = pipelineLauncher.launch(project, 
region, options);
@@ -329,9 +334,17 @@ public final class BigQueryIOST extends IOStressTestBase {
 
     // Depending on writing method there might be duplicates on different 
sides (read or write).
     if (configuration.writeMethod.equals(STORAGE_API_AT_LEAST_ONCE_METHOD)) {
-      assertTrue(rowCount >= numRecords);
+      assertTrue(
+          String.format(
+              "Number of rows in the table (%d) is less than the expected 
number (%d). Missing records: %d",
+              rowCount, (long) numRecords, (long) numRecords - rowCount),
+          rowCount >= numRecords);
     } else {
-      assertTrue(numRecords >= rowCount);
+      assertTrue(
+          String.format(
+              "Number of rows in the table (%d) is greater than the expected 
number (%d).",
+              rowCount, (long) numRecords),
+          numRecords >= rowCount);
     }
 
     // export metrics

Reply via email to