[
https://issues.apache.org/jira/browse/BEAM-9470?focusedWorklogId=402504&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-402504
]
ASF GitHub Bot logged work on BEAM-9470:
----------------------------------------
Author: ASF GitHub Bot
Created on: 12/Mar/20 21:39
Start Date: 12/Mar/20 21:39
Worklog Time Spent: 10m
Work Description: jfarr commented on pull request #11090: [BEAM-9470]
:sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391914284
##########
File path:
sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
##########
@@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
}
}
+ ArgumentCaptor<List<KinesisRecord>> recordsCaptor =
ArgumentCaptor.forClass(List.class);
+ verify(customRateLimitPolicy,
atLeastOnce()).onSuccess(recordsCaptor.capture());
+ List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+ assertThat(capturedRecords).contains(
+ ImmutableList.of(a, b),
+ singletonList(c),
+ singletonList(d),
+ Collections.emptyList()
+ );
verify(customRateLimitPolicy).onThrottle(same(e));
- verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
- verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
- verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
Review comment:
OK, I see what's happening here. It's basically a race condition. This loop
will exit as soon as nextRecord() returns the last record:
https://github.com/apache/beam/blob/b0e7afbc3baf4ac17f7159cb8aa1fabe326be3e2/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java#L320-L325
I believe that sometimes the test is hitting the verify() call for that last
record in the split second between shardReadersPool putting the record into the
blocking queue and calling onSuccess(). The fix is to use Mockito's timeout()
to ensure that we wait long enough for that last onSuccess() call. It took me
about 30,000 iterations to reproduce with the original code but I have run the
new test code over 100,000 times without a failure.
I do still think it's a good idea to make these changes to ShardReadersPool
to ensure that onSuccess() always gets called (as long as the thread wasn't
interrupted).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 402504)
Time Spent: 6h 10m (was: 6h)
> :sdks:java:io:kinesis:test is flaky
> -----------------------------------
>
> Key: BEAM-9470
> URL: https://issues.apache.org/jira/browse/BEAM-9470
> Project: Beam
> Issue Type: Test
> Components: io-java-kinesis
> Reporter: Etienne Chauchot
> Assignee: Jonothan Farr
> Priority: Major
> Time Spent: 6h 10m
> Remaining Estimate: 0h
>
> [https://scans.gradle.com/s/b4jmmu72ku5jc/console-log?task=:sdks:java:io:kinesis:test]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)