gemini-code-assist[bot] commented on code in PR #38971:
URL: https://github.com/apache/beam/pull/38971#discussion_r3415923612
##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java:
##########
@@ -406,6 +433,28 @@ private SendMessageBatchRequest anyRequest() {
return any();
}
+ /** Captures all batch requests, verifying the given mode, and groups them
by queue url. */
+ private Map<String, List<SendMessageBatchRequest>> captureBatchRequests(
+ org.mockito.verification.VerificationMode mode) {
+ ArgumentCaptor<SendMessageBatchRequest> captor =
+ ArgumentCaptor.forClass(SendMessageBatchRequest.class);
+ verify(sqs, mode).sendMessageBatch(captor.capture());
+ return
captor.getAllValues().stream().collect(groupingBy(SendMessageBatchRequest::queueUrl));
+ }
+
+ /** Asserts that the requests contain exactly the expected message bodies,
each exactly once. */
+ private void assertMessageBodies(
+ @Nullable List<SendMessageBatchRequest> requests, IntStream
expectedMsgs) {
+ assertThat(requests).isNotNull();
+ assertThat(
+ requests.stream()
+ .flatMap(req -> req.entries().stream())
+ .map(SendMessageBatchRequestEntry::messageBody)
+ .collect(toList()))
Review Comment:

Since `requests` is annotated with `@Nullable`, dereferencing it directly
with `requests.stream()` will trigger a Checker Framework nullness
warning/error (e.g., `dereference.of.nullable`). AssertJ's
`assertThat(requests).isNotNull()` is not natively recognized by the Checker
Framework as a null-check guard.
Using the already-imported `checkNotNull` utility from Guava resolves this
cleanly and ensures compile-time null safety.
```suggestion
List<SendMessageBatchRequest> safeRequests = checkNotNull(requests);
assertThat(
safeRequests.stream()
.flatMap(req -> req.entries().stream())
.map(SendMessageBatchRequestEntry::messageBody)
.collect(toList()))
```
##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java:
##########
@@ -263,10 +267,16 @@ public void testWriteBatchesWithTimeout() {
p.run().waitUntilFinish();
- SendMessageBatchRequestEntry[] entries = entries(range(0, 5));
- // due to added delay, batches are timed out on arrival of every 3rd msg
- verify(sqs).sendMessageBatch(request("queue", entries[0], entries[1],
entries[2]));
- verify(sqs).sendMessageBatch(request("queue", entries[3], entries[4]));
+ // Nominally batches time out on arrival of every 3rd message ([0,1,2],
[3,4]). The exact
+ // grouping depends on wall clock time and is unreliable on loaded
machines (#38946), so
+ // verify timing-independent invariants instead: expired batches are
flushed on append, so
+ // with >= 100ms between messages no batch can exceed 3 entries before
exceeding the 150ms
+ // timeout, forcing at least 2 batches for 5 messages.
+ Map<String, List<SendMessageBatchRequest>> requests =
captureBatchRequests(atLeast(2));
+ assertThat(requests.keySet()).containsExactly("queue");
+ assertMessageBodies(requests.get("queue"), range(0, 5));
+ assertThat(requests.get("queue"))
+ .allSatisfy(req ->
assertThat(req.entries().size()).isLessThanOrEqualTo(3));
Review Comment:

To satisfy the Checker Framework's nullness analysis, avoid passing the
nullable result of `requests.get("queue")` directly to `assertMessageBodies`
and `assertThat`. Wrapping it with `checkNotNull` ensures null safety and
avoids duplicate map lookups.
```java
List<SendMessageBatchRequest> queueRequests =
checkNotNull(requests.get("queue"));
assertMessageBodies(queueRequests, range(0, 5));
assertThat(queueRequests)
.allSatisfy(req ->
assertThat(req.entries().size()).isLessThanOrEqualTo(3));
```
##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java:
##########
@@ -337,11 +352,18 @@ public void testWriteBatchesToDynamicWithTimeout() {
p.run().waitUntilFinish();
- SendMessageBatchRequestEntry[] entries = entries(range(0, 5));
- // due to added delay, dynamic batches are timed out on arrival of every
2nd msg (per batch)
- verify(sqs).sendMessageBatch(request("even", entries[0], entries[2]));
- verify(sqs).sendMessageBatch(request("uneven", entries[1], entries[3]));
- verify(sqs).sendMessageBatch(request("even", entries[4]));
+ // Nominally dynamic batches are timed out on arrival of every 2nd message
per batch
+ // ([0,2], [1,3], [4]). The exact grouping depends on wall clock time and
is unreliable on
+ // loaded machines (#38946), so verify timing-independent invariants
instead: per queue,
+ // consecutive messages are >= 200ms apart, so expired batches are flushed
on append once
+ // reaching 2 entries, requiring at least 3 batches overall.
+ Map<String, List<SendMessageBatchRequest>> requests =
captureBatchRequests(atLeast(3));
+ assertThat(requests.keySet()).containsExactlyInAnyOrder("even", "uneven");
+ assertMessageBodies(requests.get("even"), range(0, 5).filter(i -> i % 2 ==
0));
+ assertMessageBodies(requests.get("uneven"), range(0, 5).filter(i -> i % 2
== 1));
Review Comment:

To satisfy the Checker Framework's nullness analysis, avoid passing the
nullable results of `requests.get("even")` and `requests.get("uneven")`
directly to `assertMessageBodies`. Wrapping them with `checkNotNull` ensures
null safety.
```java
List<SendMessageBatchRequest> evenRequests =
checkNotNull(requests.get("even"));
List<SendMessageBatchRequest> unevenRequests =
checkNotNull(requests.get("uneven"));
assertMessageBodies(evenRequests, range(0, 5).filter(i -> i % 2 == 0));
assertMessageBodies(unevenRequests, range(0, 5).filter(i -> i % 2 == 1));
```
##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java:
##########
@@ -360,14 +382,19 @@ public void testWriteBatchesToDynamicWithStrictTimeout() {
p.run().waitUntilFinish();
- SendMessageBatchRequestEntry[] entries = entries(range(0, 5));
- // using strict timeouts batches, batches are timed out by a separate
thread before any 2nd
- // entry
- verify(sqs).sendMessageBatch(request("even", entries[0]));
- verify(sqs).sendMessageBatch(request("uneven", entries[1]));
- verify(sqs).sendMessageBatch(request("even", entries[2]));
- verify(sqs).sendMessageBatch(request("uneven", entries[3]));
- verify(sqs).sendMessageBatch(request("even", entries[4]));
+ // Nominally the separate timeout thread flushes every batch before a 2nd
entry arrives
+ // (5 singleton batches). The exact grouping depends on wall clock time
and is unreliable
+ // on loaded machines (#38946), so verify timing-independent invariants
instead. Expired
+ // batches are also flushed on append (independently of the timeout
thread) and per queue
+ // consecutive messages are >= 200ms apart, so no batch can exceed 2
entries and at least
+ // 3 batches are required overall.
+ Map<String, List<SendMessageBatchRequest>> requests =
captureBatchRequests(atLeast(3));
+ assertThat(requests.keySet()).containsExactlyInAnyOrder("even", "uneven");
+ assertMessageBodies(requests.get("even"), range(0, 5).filter(i -> i % 2 ==
0));
+ assertMessageBodies(requests.get("uneven"), range(0, 5).filter(i -> i % 2
== 1));
Review Comment:

To satisfy the Checker Framework's nullness analysis, avoid passing the
nullable results of `requests.get("even")` and `requests.get("uneven")`
directly to `assertMessageBodies`. Wrapping them with `checkNotNull` ensures
null safety.
```java
List<SendMessageBatchRequest> evenRequests =
checkNotNull(requests.get("even"));
List<SendMessageBatchRequest> unevenRequests =
checkNotNull(requests.get("uneven"));
assertMessageBodies(evenRequests, range(0, 5).filter(i -> i % 2 == 0));
assertMessageBodies(unevenRequests, range(0, 5).filter(i -> i % 2 == 1));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]