tkaymak commented on code in PR #38971:
URL: https://github.com/apache/beam/pull/38971#discussion_r3416142261
##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java:
##########
@@ -406,6 +433,28 @@ private SendMessageBatchRequest anyRequest() {
return any();
}
+ /** Captures all batch requests, verifying the given mode, and groups them
by queue url. */
+ private Map<String, List<SendMessageBatchRequest>> captureBatchRequests(
+ org.mockito.verification.VerificationMode mode) {
+ ArgumentCaptor<SendMessageBatchRequest> captor =
+ ArgumentCaptor.forClass(SendMessageBatchRequest.class);
+ verify(sqs, mode).sendMessageBatch(captor.capture());
+ return
captor.getAllValues().stream().collect(groupingBy(SendMessageBatchRequest::queueUrl));
+ }
+
+ /** Asserts that the requests contain exactly the expected message bodies,
each exactly once. */
+ private void assertMessageBodies(
+ @Nullable List<SendMessageBatchRequest> requests, IntStream
expectedMsgs) {
+ assertThat(requests).isNotNull();
+ assertThat(
+ requests.stream()
+ .flatMap(req -> req.entries().stream())
+ .map(SendMessageBatchRequestEntry::messageBody)
+ .collect(toList()))
Review Comment:
Fixed in `62a4c6d524e`: batch requests are now fetched through a non-null
helper `requestsFor(requests, queue)` backed by Beam's `checkStateNotNull`, and
`assertMessageBodies` now takes a non-null list. Verified with
`-PenableCheckerFramework` — `compileTestJava` is clean.
##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java:
##########
@@ -263,10 +267,16 @@ public void testWriteBatchesWithTimeout() {
p.run().waitUntilFinish();
- SendMessageBatchRequestEntry[] entries = entries(range(0, 5));
- // due to added delay, batches are timed out on arrival of every 3rd msg
- verify(sqs).sendMessageBatch(request("queue", entries[0], entries[1],
entries[2]));
- verify(sqs).sendMessageBatch(request("queue", entries[3], entries[4]));
+ // Nominally batches time out on arrival of every 3rd message ([0,1,2],
[3,4]). The exact
+ // grouping depends on wall clock time and is unreliable on loaded
machines (#38946), so
+ // verify timing-independent invariants instead: expired batches are
flushed on append, so
+ // with >= 100ms between messages no batch can exceed 3 entries before
exceeding the 150ms
+ // timeout, forcing at least 2 batches for 5 messages.
+ Map<String, List<SendMessageBatchRequest>> requests =
captureBatchRequests(atLeast(2));
+ assertThat(requests.keySet()).containsExactly("queue");
+ assertMessageBodies(requests.get("queue"), range(0, 5));
+ assertThat(requests.get("queue"))
+ .allSatisfy(req ->
assertThat(req.entries().size()).isLessThanOrEqualTo(3));
Review Comment:
Fixed in `62a4c6d524e`: batch requests are now fetched through a non-null
helper `requestsFor(requests, queue)` backed by Beam's `checkStateNotNull`, and
`assertMessageBodies` now takes a non-null list. Verified with
`-PenableCheckerFramework` — `compileTestJava` is clean.
##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java:
##########
@@ -285,11 +295,16 @@ public void testWriteBatchesWithStrictTimeout() {
p.run().waitUntilFinish();
- SendMessageBatchRequestEntry[] entries = entries(range(0, 5));
- // using strict timeouts batches, batches are timed out by a separate
thread
- verify(sqs).sendMessageBatch(request("queue", entries[0], entries[1]));
- verify(sqs).sendMessageBatch(request("queue", entries[2], entries[3]));
- verify(sqs).sendMessageBatch(request("queue", entries[4]));
+ // Nominally the separate timeout thread flushes [0,1], [2,3], [4]. The
exact grouping
+ // depends on wall clock time and is unreliable on loaded machines
(#38946), so verify
+ // timing-independent invariants instead. Expired batches are also flushed
on append
+ // (independently of the timeout thread), so no batch can exceed 3 entries
and 5 messages
+ // require at least 2 batches.
+ Map<String, List<SendMessageBatchRequest>> requests =
captureBatchRequests(atLeast(2));
+ assertThat(requests.keySet()).containsExactly("queue");
+ assertMessageBodies(requests.get("queue"), range(0, 5));
+ assertThat(requests.get("queue"))
+ .allSatisfy(req ->
assertThat(req.entries().size()).isLessThanOrEqualTo(3));
Review Comment:
Fixed in `62a4c6d524e`: batch requests are now fetched through a non-null
helper `requestsFor(requests, queue)` backed by Beam's `checkStateNotNull`, and
`assertMessageBodies` now takes a non-null list. Verified with
`-PenableCheckerFramework` — `compileTestJava` is clean.
##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java:
##########
@@ -337,11 +352,18 @@ public void testWriteBatchesToDynamicWithTimeout() {
p.run().waitUntilFinish();
- SendMessageBatchRequestEntry[] entries = entries(range(0, 5));
- // due to added delay, dynamic batches are timed out on arrival of every
2nd msg (per batch)
- verify(sqs).sendMessageBatch(request("even", entries[0], entries[2]));
- verify(sqs).sendMessageBatch(request("uneven", entries[1], entries[3]));
- verify(sqs).sendMessageBatch(request("even", entries[4]));
+ // Nominally dynamic batches are timed out on arrival of every 2nd message
per batch
+ // ([0,2], [1,3], [4]). The exact grouping depends on wall clock time and
is unreliable on
+ // loaded machines (#38946), so verify timing-independent invariants
instead: per queue,
+ // consecutive messages are >= 200ms apart, so expired batches are flushed
on append once
+ // reaching 2 entries, requiring at least 3 batches overall.
+ Map<String, List<SendMessageBatchRequest>> requests =
captureBatchRequests(atLeast(3));
+ assertThat(requests.keySet()).containsExactlyInAnyOrder("even", "uneven");
+ assertMessageBodies(requests.get("even"), range(0, 5).filter(i -> i % 2 ==
0));
+ assertMessageBodies(requests.get("uneven"), range(0, 5).filter(i -> i % 2
== 1));
Review Comment:
Fixed in `62a4c6d524e`: batch requests are now fetched through a non-null
helper `requestsFor(requests, queue)` backed by Beam's `checkStateNotNull`, and
`assertMessageBodies` now takes a non-null list. Verified with
`-PenableCheckerFramework` — `compileTestJava` is clean.
##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java:
##########
@@ -360,14 +382,19 @@ public void testWriteBatchesToDynamicWithStrictTimeout() {
p.run().waitUntilFinish();
- SendMessageBatchRequestEntry[] entries = entries(range(0, 5));
- // using strict timeouts batches, batches are timed out by a separate
thread before any 2nd
- // entry
- verify(sqs).sendMessageBatch(request("even", entries[0]));
- verify(sqs).sendMessageBatch(request("uneven", entries[1]));
- verify(sqs).sendMessageBatch(request("even", entries[2]));
- verify(sqs).sendMessageBatch(request("uneven", entries[3]));
- verify(sqs).sendMessageBatch(request("even", entries[4]));
+ // Nominally the separate timeout thread flushes every batch before a 2nd
entry arrives
+ // (5 singleton batches). The exact grouping depends on wall clock time
and is unreliable
+ // on loaded machines (#38946), so verify timing-independent invariants
instead. Expired
+ // batches are also flushed on append (independently of the timeout
thread) and per queue
+ // consecutive messages are >= 200ms apart, so no batch can exceed 2
entries and at least
+ // 3 batches are required overall.
+ Map<String, List<SendMessageBatchRequest>> requests =
captureBatchRequests(atLeast(3));
+ assertThat(requests.keySet()).containsExactlyInAnyOrder("even", "uneven");
+ assertMessageBodies(requests.get("even"), range(0, 5).filter(i -> i % 2 ==
0));
+ assertMessageBodies(requests.get("uneven"), range(0, 5).filter(i -> i % 2
== 1));
Review Comment:
Fixed in `62a4c6d524e`: batch requests are now fetched through a non-null
helper `requestsFor(requests, queue)` backed by Beam's `checkStateNotNull`, and
`assertMessageBodies` now takes a non-null list. Verified with
`-PenableCheckerFramework` — `compileTestJava` is clean.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]