This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch SCB-2394 in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 38a7284b782ea93b17ad207ef86580b230ee8723 Author: Willem Jiang <[email protected]> AuthorDate: Fri Feb 25 10:06:19 2022 +0800 SCB-2394 Try to fix the github action radmon test error --- .../server/AlphaIntegrationWithRandomPortTest.java | 45 ++++++++++++---------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java index f6a026b..5e05e2d 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java @@ -117,6 +117,9 @@ public class AlphaIntegrationWithRandomPortTest { @Autowired private TxConsistentService consistentService; + // Setup the most wait time for checking the result, + // the number is used to fix random test errors in a slow box + private static final Integer waitTime = 4; private static final AtomicInteger receivedCommandsCounter = new AtomicInteger(); private static final Queue<GrpcCompensateCommand> receivedCommands = new ConcurrentLinkedQueue<>(); @@ -167,7 +170,7 @@ public class AlphaIntegrationWithRandomPortTest { asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig); blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent)); // use the asynchronous stub need to wait for some time - await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty()); + await().atMost(waitTime, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty()); assertThat(receivedCommands.isEmpty(), is(true)); @@ -187,7 +190,7 @@ public class AlphaIntegrationWithRandomPortTest { public void closeStreamOnDisconnected() { asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig); - await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName())); + await().atMost(waitTime, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName())); assertThat( omegaCallbacks.get(serviceConfig.getServiceName()).get(serviceConfig.getInstanceId()), @@ -198,20 +201,20 @@ public class AlphaIntegrationWithRandomPortTest { omegaCallbacks.get(serviceConfig.getServiceName()).containsKey(serviceConfig.getInstanceId()), is(false)); - await().atMost(1, SECONDS).until(compensateResponseObserver::isCompleted); + await().atMost(waitTime, SECONDS).until(compensateResponseObserver::isCompleted); } @Test public void closeStreamOfDisconnectedClientOnly() { asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig); - await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName())); + await().atMost(waitTime, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName())); GrpcServiceConfig anotherServiceConfig = someServiceConfig(); CompensationStreamObserver anotherResponseObserver = new CompensationStreamObserver(); TxEventServiceStub otherAsyncStub = TxEventServiceGrpc.newStub(clientChannel); otherAsyncStub.onConnected(anotherResponseObserver).onNext(anotherServiceConfig); - await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName())); + await().atMost(waitTime, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName())); blockingStub.onDisconnected(serviceConfig); @@ -234,7 +237,7 @@ public class AlphaIntegrationWithRandomPortTest { consistentService.handle(someTxAbortEvent(serviceName, instanceId)); - await().atMost(1, SECONDS).until(() -> omegaCallbacks.get(serviceName).isEmpty()); + await().atMost(waitTime, SECONDS).until(() -> omegaCallbacks.get(serviceName).isEmpty()); } @Test @@ -244,7 +247,7 @@ public class AlphaIntegrationWithRandomPortTest { blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent)); blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], compensationMethod)); - await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty()); + await().atMost(waitTime, SECONDS).until(() -> !receivedCommands.isEmpty()); GrpcCompensateCommand command = receivedCommands.poll(); assertThat(command.getGlobalTxId(), is(globalTxId)); @@ -268,7 +271,7 @@ public class AlphaIntegrationWithRandomPortTest { blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId1, parentTxId1, new byte[0], "method b")); blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent)); - await().atMost(2, SECONDS).until(() -> receivedCommandsCounter.get() > 1); + await().atMost(waitTime, SECONDS).until(() -> receivedCommandsCounter.get() > 1); assertThat(receivedCommands, containsInAnyOrder( GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1) @@ -283,10 +286,10 @@ public class AlphaIntegrationWithRandomPortTest { asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig); blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent)); blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent)); - await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty()); + await().atMost(waitTime, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty()); blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent)); - await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty()); + await().atMost(waitTime, SECONDS).until(() -> !receivedCommands.isEmpty()); GrpcCompensateCommand command = receivedCommands.poll(); assertThat(command.getGlobalTxId(), is(globalTxId)); @@ -310,10 +313,10 @@ public class AlphaIntegrationWithRandomPortTest { TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel); anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString())); - await().atMost(1, SECONDS).until(() -> eventRepo.count() == 3); + await().atMost(waitTime, SECONDS).until(() -> eventRepo.count() == 3); blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent)); - await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty()); + await().atMost(waitTime, SECONDS).until(() -> !receivedCommands.isEmpty()); assertThat(receivedCommandsCounter.get(), is(1)); assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId)); @@ -330,7 +333,7 @@ public class AlphaIntegrationWithRandomPortTest { blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent)); - await().atMost(1000, SECONDS).until(() -> receivedCommandsCounter.get() == 1); + await().atMost(waitTime, SECONDS).until(() -> receivedCommandsCounter.get() == 1); String localTxId1 = UUID.randomUUID().toString(); String parentTxId1 = UUID.randomUUID().toString(); @@ -354,10 +357,10 @@ public class AlphaIntegrationWithRandomPortTest { String anotherLocalTxId2 = UUID.randomUUID().toString(); blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId2)); - await().atMost(1, SECONDS).until(() -> eventRepo.count() == 7); + await().atMost(waitTime, SECONDS).until(() -> eventRepo.count() == 7); blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId2)); - await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty()); + await().atMost(waitTime, SECONDS).until(() -> !receivedCommands.isEmpty()); assertThat(receivedCommandsCounter.get(), is(1)); assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId)); @@ -375,7 +378,7 @@ public class AlphaIntegrationWithRandomPortTest { blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, anotherLocalTxId)); - await().atMost(1, SECONDS).until(() -> { + await().atMost(waitTime, SECONDS).until(() -> { List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); return events.size() == 8 && events.get(events.size() - 1).type().equals(SagaEndedEvent.name()); }); @@ -386,14 +389,14 @@ public class AlphaIntegrationWithRandomPortTest { asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig); blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1)); - await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3); + await().atMost(waitTime, SECONDS).until(() -> eventRepo.count() == 3); List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); assertThat(events.get(0).type(), is(SagaStartedEvent.name())); assertThat(events.get(1).type(), is(TxAbortedEvent.name())); assertThat(events.get(2).type(), is(SagaEndedEvent.name())); - await().atMost(2, SECONDS).until(this::waitTillTimeoutDone); + await().atMost(waitTime, SECONDS).until(this::waitTillTimeoutDone); assertThat(timeoutEntityRepository.count(), is(1L)); Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll(); @@ -410,7 +413,7 @@ public class AlphaIntegrationWithRandomPortTest { blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId, null)); blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1)); - await().atMost(2, SECONDS).until(() -> { + await().atMost(waitTime, SECONDS).until(() -> { List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); return eventRepo.count() == 5 && events.get(events.size() - 1).type().equals(SagaEndedEvent.name()); }); @@ -427,7 +430,7 @@ public class AlphaIntegrationWithRandomPortTest { assertThat(events.get(4).type(), is(TxCompensatedEvent.name())); } - await().atMost(2, SECONDS).until(this::waitTillTimeoutDone); + await().atMost(waitTime, SECONDS).until(this::waitTillTimeoutDone); assertThat(timeoutEntityRepository.count(), is(1L)); Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll(); @@ -446,7 +449,7 @@ public class AlphaIntegrationWithRandomPortTest { blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 0)); blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent)); - await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4); + await().atMost(waitTime, SECONDS).until(() -> eventRepo.count() == 4); List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); assertThat(events.size(), is(4));
