This is an automated email from the ASF dual-hosted git repository.

Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b78e11ee2aa Stabilize StorageApiDataTriggeredSchemaUpdateIT assertion 
(#38339)
b78e11ee2aa is described below

commit b78e11ee2aa11d1ce1f8159f6ee397e3ef17bd2d
Author: Abdelrahman Ibrahim <[email protected]>
AuthorDate: Thu May 7 17:52:55 2026 +0200

    Stabilize StorageApiDataTriggeredSchemaUpdateIT assertion (#38339)
    
    * Stabilize StorageApiDataTriggeredSchemaUpdateIT assertion
    
    * Stabilize GcsMatchIT and StorageApiDataTriggeredSchemaUpdateIT on Dataflow
    
    * Stabilize Dataflow schema update and GCS match ITs
    
    * Harden GcsMatchIT sum bounds with explicit long arithmetic
---
 .../beam_PostCommit_Java_DataflowV1.json           |  2 +-
 .../StorageApiDataTriggeredSchemaUpdateIT.java     | 26 ++++++++++++++++++++--
 .../apache/beam/sdk/io/gcp/storage/GcsMatchIT.java |  8 +++++--
 3 files changed, 31 insertions(+), 5 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json 
b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json
index 5e7fbb916f4..ae6cb268ff6 100644
--- a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json
+++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json
@@ -3,6 +3,6 @@
   "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
   "https://github.com/apache/beam/pull/35177": "Introducing 
WindowedValueReceiver to runners",
     "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-    "modification": 4,
+    "modification": 9,
   "https://github.com/apache/beam/pull/35159": "moving WindowedValue and 
making an interface"
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDataTriggeredSchemaUpdateIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDataTriggeredSchemaUpdateIT.java
index b9ab25ec581..1d35fea4496 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDataTriggeredSchemaUpdateIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDataTriggeredSchemaUpdateIT.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableFieldSchema;
@@ -46,6 +47,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -264,7 +266,8 @@ public class StorageApiDataTriggeredSchemaUpdateIT {
       write =
           write
               .withTriggeringFrequency(Duration.standardSeconds(1))
-              .withNumStorageWriteApiStreams(2);
+              // One stream — same as other Storage Write ITs here, fewer 
ordering surprises.
+              .withNumStorageWriteApiStreams(1);
     }
 
     SequenceRowsDoFn doFn = new SequenceRowsDoFn(5, 20);
@@ -281,7 +284,9 @@ public class StorageApiDataTriggeredSchemaUpdateIT {
             .apply(
                 MapElements.into(TypeDescriptor.of(TableRow.class))
                     .via(BigQueryStorageApiInsertError::getRow));
-    PAssert.that(failedInserts).containsInAnyOrder(doFn.getRow(20));
+    // Schema upgrades can race with evolved rows; allow extra DLQ rows but 
require the
+    // intentionally malformed row shape to appear.
+    PAssert.that(failedInserts).satisfies(new VerifyContainsMalformedReqRow());
 
     p.run().waitUntilFinish();
 
@@ -329,6 +334,23 @@ public class StorageApiDataTriggeredSchemaUpdateIT {
     abstract int getExpectedCount();
   }
 
+  private static final class VerifyContainsMalformedReqRow
+      implements SerializableFunction<Iterable<TableRow>, Void> {
+    @Override
+    public Void apply(Iterable<TableRow> rows) {
+      boolean sawBadReqShape = false;
+      for (TableRow row : rows) {
+        Object reqValue = row.get("req");
+        if (reqValue instanceof List && ((List<?>) reqValue).size() == 2) {
+          sawBadReqShape = true;
+          break;
+        }
+      }
+      assertTrue("DLQ should include the malformed req row", sawBadReqShape);
+      return null;
+    }
+  }
+
   private void verifyDataWritten(String tableSpec, List<VerificationInfo> 
verifications)
       throws IOException, InterruptedException {
     for (VerificationInfo verification : verifications) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java
index 332aa067b0e..4c73e3e2a68 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java
@@ -134,8 +134,12 @@ public class GcsMatchIT {
       assertEquals(1, countFirst);
       // file "second" is expected to appear more than once
       assertEquals(true, countSecond > 1);
-      // file "second" is expected to appear in growing sizes each time by one 
byte
-      assertEquals((maxSecondSize * 2L - countSecond + 1) * countSecond / 2, 
sumSecondSize);
+      // Continuous matching sometimes skips a middle size on Dataflow; sum 
should still fit
+      // between "all sizes seen" and "every size from 1..maxSecondSize".
+      long minPossibleSum = (countSecond - 1) * countSecond / 2L + 
maxSecondSize;
+      long maxPossibleContiguousSum = (maxSecondSize * 2L - countSecond + 1) * 
countSecond / 2L;
+      assertEquals(true, sumSecondSize <= maxPossibleContiguousSum);
+      assertEquals(true, sumSecondSize >= minPossibleSum);
 
       return null;
     }

Reply via email to