gemini-code-assist[bot] commented on code in PR #38971:
URL: https://github.com/apache/beam/pull/38971#discussion_r3415923612


##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java:
##########
@@ -406,6 +433,28 @@ private SendMessageBatchRequest anyRequest() {
     return any();
   }
 
+  /** Captures all batch requests, verifying the given mode, and groups them 
by queue url. */
+  private Map<String, List<SendMessageBatchRequest>> captureBatchRequests(
+      org.mockito.verification.VerificationMode mode) {
+    ArgumentCaptor<SendMessageBatchRequest> captor =
+        ArgumentCaptor.forClass(SendMessageBatchRequest.class);
+    verify(sqs, mode).sendMessageBatch(captor.capture());
+    return 
captor.getAllValues().stream().collect(groupingBy(SendMessageBatchRequest::queueUrl));
+  }
+
+  /** Asserts that the requests contain exactly the expected message bodies, 
each exactly once. */
+  private void assertMessageBodies(
+      @Nullable List<SendMessageBatchRequest> requests, IntStream 
expectedMsgs) {
+    assertThat(requests).isNotNull();
+    assertThat(
+            requests.stream()
+                .flatMap(req -> req.entries().stream())
+                .map(SendMessageBatchRequestEntry::messageBody)
+                .collect(toList()))

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Since `requests` is annotated with `@Nullable`, dereferencing it directly 
with `requests.stream()` will trigger a Checker Framework nullness 
warning/error (e.g., `dereference.of.nullable`). AssertJ's 
`assertThat(requests).isNotNull()` is not natively recognized by the Checker 
Framework as a null-check guard.
   
   Using the already-imported `checkNotNull` utility from Guava resolves this 
cleanly and ensures compile-time null safety.
   
   ```suggestion
       List<SendMessageBatchRequest> safeRequests = checkNotNull(requests);
       assertThat(
               safeRequests.stream()
                   .flatMap(req -> req.entries().stream())
                   .map(SendMessageBatchRequestEntry::messageBody)
                   .collect(toList()))
   ```



##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java:
##########
@@ -263,10 +267,16 @@ public void testWriteBatchesWithTimeout() {
 
     p.run().waitUntilFinish();
 
-    SendMessageBatchRequestEntry[] entries = entries(range(0, 5));
-    // due to added delay, batches are timed out on arrival of every 3rd msg
-    verify(sqs).sendMessageBatch(request("queue", entries[0], entries[1], 
entries[2]));
-    verify(sqs).sendMessageBatch(request("queue", entries[3], entries[4]));
+    // Nominally batches time out on arrival of every 3rd message ([0,1,2], 
[3,4]). The exact
+    // grouping depends on wall clock time and is unreliable on loaded 
machines (#38946), so
+    // verify timing-independent invariants instead: expired batches are 
flushed on append, so
+    // with >= 100ms between messages no batch can exceed 3 entries before 
exceeding the 150ms
+    // timeout, forcing at least 2 batches for 5 messages.
+    Map<String, List<SendMessageBatchRequest>> requests = 
captureBatchRequests(atLeast(2));
+    assertThat(requests.keySet()).containsExactly("queue");
+    assertMessageBodies(requests.get("queue"), range(0, 5));
+    assertThat(requests.get("queue"))
+        .allSatisfy(req -> 
assertThat(req.entries().size()).isLessThanOrEqualTo(3));

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To satisfy the Checker Framework's nullness analysis, avoid passing the 
nullable result of `requests.get("queue")` directly to `assertMessageBodies` 
and `assertThat`. Wrapping it with `checkNotNull` ensures null safety and 
avoids duplicate map lookups.
   
   ```java
       List<SendMessageBatchRequest> queueRequests = 
checkNotNull(requests.get("queue"));
       assertMessageBodies(queueRequests, range(0, 5));
       assertThat(queueRequests)
           .allSatisfy(req -> 
assertThat(req.entries().size()).isLessThanOrEqualTo(3));
   ```



##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java:
##########
@@ -337,11 +352,18 @@ public void testWriteBatchesToDynamicWithTimeout() {
 
     p.run().waitUntilFinish();
 
-    SendMessageBatchRequestEntry[] entries = entries(range(0, 5));
-    // due to added delay, dynamic batches are timed out on arrival of every 
2nd msg (per batch)
-    verify(sqs).sendMessageBatch(request("even", entries[0], entries[2]));
-    verify(sqs).sendMessageBatch(request("uneven", entries[1], entries[3]));
-    verify(sqs).sendMessageBatch(request("even", entries[4]));
+    // Nominally dynamic batches are timed out on arrival of every 2nd message 
per batch
+    // ([0,2], [1,3], [4]). The exact grouping depends on wall clock time and 
is unreliable on
+    // loaded machines (#38946), so verify timing-independent invariants 
instead: per queue,
+    // consecutive messages are >= 200ms apart, so expired batches are flushed 
on append once
+    // reaching 2 entries, requiring at least 3 batches overall.
+    Map<String, List<SendMessageBatchRequest>> requests = 
captureBatchRequests(atLeast(3));
+    assertThat(requests.keySet()).containsExactlyInAnyOrder("even", "uneven");
+    assertMessageBodies(requests.get("even"), range(0, 5).filter(i -> i % 2 == 
0));
+    assertMessageBodies(requests.get("uneven"), range(0, 5).filter(i -> i % 2 
== 1));

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To satisfy the Checker Framework's nullness analysis, avoid passing the 
nullable results of `requests.get("even")` and `requests.get("uneven")` 
directly to `assertMessageBodies`. Wrapping them with `checkNotNull` ensures 
null safety.
   
   ```java
       List<SendMessageBatchRequest> evenRequests = 
checkNotNull(requests.get("even"));
       List<SendMessageBatchRequest> unevenRequests = 
checkNotNull(requests.get("uneven"));
       assertMessageBodies(evenRequests, range(0, 5).filter(i -> i % 2 == 0));
       assertMessageBodies(unevenRequests, range(0, 5).filter(i -> i % 2 == 1));
   ```



##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java:
##########
@@ -360,14 +382,19 @@ public void testWriteBatchesToDynamicWithStrictTimeout() {
 
     p.run().waitUntilFinish();
 
-    SendMessageBatchRequestEntry[] entries = entries(range(0, 5));
-    // using strict timeouts batches, batches are timed out by a separate 
thread before any 2nd
-    // entry
-    verify(sqs).sendMessageBatch(request("even", entries[0]));
-    verify(sqs).sendMessageBatch(request("uneven", entries[1]));
-    verify(sqs).sendMessageBatch(request("even", entries[2]));
-    verify(sqs).sendMessageBatch(request("uneven", entries[3]));
-    verify(sqs).sendMessageBatch(request("even", entries[4]));
+    // Nominally the separate timeout thread flushes every batch before a 2nd 
entry arrives
+    // (5 singleton batches). The exact grouping depends on wall clock time 
and is unreliable
+    // on loaded machines (#38946), so verify timing-independent invariants 
instead. Expired
+    // batches are also flushed on append (independently of the timeout 
thread) and per queue
+    // consecutive messages are >= 200ms apart, so no batch can exceed 2 
entries and at least
+    // 3 batches are required overall.
+    Map<String, List<SendMessageBatchRequest>> requests = 
captureBatchRequests(atLeast(3));
+    assertThat(requests.keySet()).containsExactlyInAnyOrder("even", "uneven");
+    assertMessageBodies(requests.get("even"), range(0, 5).filter(i -> i % 2 == 
0));
+    assertMessageBodies(requests.get("uneven"), range(0, 5).filter(i -> i % 2 
== 1));

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To satisfy the Checker Framework's nullness analysis, avoid passing the 
nullable results of `requests.get("even")` and `requests.get("uneven")` 
directly to `assertMessageBodies`. Wrapping them with `checkNotNull` ensures 
null safety.
   
   ```java
       List<SendMessageBatchRequest> evenRequests = 
checkNotNull(requests.get("even"));
       List<SendMessageBatchRequest> unevenRequests = 
checkNotNull(requests.get("uneven"));
       assertMessageBodies(evenRequests, range(0, 5).filter(i -> i % 2 == 0));
       assertMessageBodies(unevenRequests, range(0, 5).filter(i -> i % 2 == 1));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to