This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new a7922c63d [tests] Fix flakiness in TestAsyncKuduSession.java
a7922c63d is described below
commit a7922c63d064f68a32b1829e6423cde68f76133b
Author: kedeng <[email protected]>
AuthorDate: Fri Jul 21 14:28:38 2023 +0800
[tests] Fix flakiness in TestAsyncKuduSession.java
The test testFlushBySize within TestAsyncKuduSession.java
has been flaky with a 39% failure rate for the last weeks.
The main reason for the failure of this case is that the
newly added unit test did not consider the impact of the
cache. I have modified the relevant code of the unit test
to verify that the newly added flush strategy is indeed
effective by performing a large number of insert operations
exceeding the number of cache buffers.
Change-Id: Iadc61fcddb9ffbfe05ef398ba61c79f8063d72de
Reviewed-on: http://gerrit.cloudera.org:8080/20238
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
.../apache/kudu/client/TestAsyncKuduSession.java | 35 ++++++++++++++--------
1 file changed, 22 insertions(+), 13 deletions(-)
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index 49e3f0662..02876c5d5 100644
---
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -396,7 +396,9 @@ public class TestAsyncKuduSession {
public void testFlushBySize() throws Exception {
AsyncKuduSession session = client.newSession();
final int kBufferSizeOps = 10;
- final int kNumOps = 2;
+ // Considering the existence of buffers, we set a number of operations
that is significantly
+ // larger than the number of buffers to ensure that the buffers are
triggered to flush.
+ final int kNumOps = 100;
// Set a small buffer size so we should flush every time.
session.setMutationBufferSpace(kBufferSizeOps, 1);
// Set a large flush interval so if the flush by size function is not
correctly implemented,
@@ -405,21 +407,28 @@ public class TestAsyncKuduSession {
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
for (int i = 0; i < kNumOps; i++) {
- // Should always flush immediately so here join will return soon.
- OperationResponse resp =
session.apply(createInsert(i)).join(DEFAULT_SLEEP);
- assertFalse(resp.hasRowError());
+ // If the client tries to buffer many more operations, it may receive a
+ // PleaseThrottleException. In this case, if the client simply waits for
a flush notification
+ // on the Deferred returned with the exception, it can continue to
buffer operations.
+ Insert insert = createInsert(i);
+ try {
+ session.apply(insert);
+ } catch (PleaseThrottleException ex) {
+ ex.getDeferred().join(DEFAULT_SLEEP);
+ session.apply(insert);
+ }
}
- // Mode AUTO_FLUSH_BACKGROUND also takes time, so we may need wait here.
- assertEventuallyTrue(String.format("Timeout for flush pending operations"),
- new BooleanExpression() {
- @Override
- public boolean get() throws Exception {
- return !session.hasPendingOperations();
- }
- }, /* timeoutMillis = */500000);
+ // There might be pending requests in the cache, but the above operation
should not generate any
+ // errors.
assertEquals(0, session.countPendingErrors());
// Confirm that we can still make progress.
- session.apply(createInsert(kNumOps)).join(DEFAULT_SLEEP);
+ Insert insert = createInsert(kNumOps);
+ try {
+ session.apply(insert);
+ } catch (PleaseThrottleException ex) {
+ ex.getDeferred().join(DEFAULT_SLEEP);
+ session.apply(insert);
+ }
for (OperationResponse resp: session.flush().join(DEFAULT_SLEEP)) {
assertFalse(resp.hasRowError());