This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 6f27eacff767501c0ef0c9215c4eb7d71191a249 Author: seanyinx <[email protected]> AuthorDate: Tue Jan 16 18:43:31 2018 +0800 SCB-218 polling in background for events to compensate Signed-off-by: seanyinx <[email protected]> --- alpha/alpha-core/pom.xml | 15 +++++++ .../servicecomb/saga/alpha/core/Command.java | 20 +++++++++ .../saga/alpha/core/CommandRepository.java | 2 + .../saga/alpha/core/TxConsistentService.java | 51 +++++++++++++++++----- .../saga/alpha/core/TxConsistentServiceTest.java | 35 +++++++++++---- .../servicecomb/saga/alpha/server/AlphaConfig.java | 7 ++- .../saga/alpha/server/CommandEntity.java | 6 ++- .../saga/alpha/server/CommandEntityRepository.java | 14 +++++- .../saga/alpha/server/SpringCommandRepository.java | 20 ++++++++- .../alpha/server/TxEventEnvelopeRepository.java | 3 +- .../saga/alpha/server/AlphaIntegrationTest.java | 39 +++++++++++++---- 11 files changed, 174 insertions(+), 38 deletions(-) diff --git a/alpha/alpha-core/pom.xml b/alpha/alpha-core/pom.xml index aa74718..107fff4 100644 --- a/alpha/alpha-core/pom.xml +++ b/alpha/alpha-core/pom.xml @@ -44,6 +44,21 @@ </dependency> <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> </dependency> diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java index 08f8527..c852c16 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java @@ -83,6 +83,14 @@ public class Command { event.payloads()); } + String serviceName() { + return serviceName; + } + + String instanceId() { + return instanceId; + } + String globalTxId() { return globalTxId; } @@ -91,6 +99,18 @@ public class Command { return localTxId; } + String parentTxId() { + return parentTxId; + } + + String compensationMethod() { + return compensationMethod; + } + + byte[] payloads() { + return payloads; + } + String status() { return status; } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java index 915d476..22f2b41 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java @@ -29,4 +29,6 @@ public interface CommandRepository { void markCommandAsDone(String globalTxId, String localTxId); List<Command> findUncompletedCommands(String globalTxId); + + List<Command> findFirstCommandToCompensate(); } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java index 560096f..8f88735 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java @@ -17,21 +17,27 @@ package org.apache.servicecomb.saga.alpha.core; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; +import java.lang.invoke.MethodHandles; import java.util.Date; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class TxConsistentService { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {}; private static final byte[] EMPTY_PAYLOAD = new byte[0]; @@ -46,13 +52,17 @@ public class TxConsistentService { }}; private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); public TxConsistentService(TxEventRepository eventRepository, CommandRepository commandRepository, - OmegaCallback omegaCallback) { + OmegaCallback omegaCallback, + int commandPollingInterval) { this.eventRepository = eventRepository; this.commandRepository = commandRepository; this.omegaCallback = omegaCallback; + + scheduleCompensationCommandPolling(commandPollingInterval); } public boolean handle(TxEvent event) { @@ -69,10 +79,6 @@ public class TxConsistentService { private void compensateIfAlreadyAborted(TxEvent event) { if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) { commandRepository.saveCompensationCommand(event.globalTxId(), event.localTxId()); - TxEvent correspondingStartedEvent = eventRepository - .findFirstTransaction(event.globalTxId(), event.localTxId(), TxStartedEvent.name()); - - omegaCallback.compensate(correspondingStartedEvent); } } @@ -81,22 +87,19 @@ public class TxConsistentService { } private void compensate(TxEvent event) { - List<TxEvent> events = eventRepository.findTransactionsToCompensate(event.globalTxId()); - - events.removeIf(this::isCompensationScheduled); - commandRepository.saveCompensationCommands(event.globalTxId()); - - events.forEach(omegaCallback::compensate); } // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late, // unless we ask user to specify a name for each participant in the global TX in @Compensable private void updateCompensateStatus(TxEvent event) { commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId()); + log.info("Transaction with globalTxId {} and localTxId {} was compensated", event.globalTxId(), event.localTxId()); + if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty() && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) { markGlobalTxEnd(event); + log.info("Marked end of transaction with globalTxId {}", event.globalTxId()); } } @@ -109,4 +112,28 @@ public class TxConsistentService { private boolean isGlobalTxAborted(TxEvent event) { return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty(); } + + private void scheduleCompensationCommandPolling(int commandPollingInterval) { + scheduler.scheduleWithFixedDelay( + () -> commandRepository.findFirstCommandToCompensate() + .forEach(command -> { + log.info("Compensating transaction with globalTxId {} and localTxId {}", + command.globalTxId(), + command.localTxId()); + + omegaCallback.compensate(new TxEvent( + command.serviceName(), + command.instanceId(), + command.globalTxId(), + command.localTxId(), + command.parentTxId(), + TxStartedEvent.name(), + command.compensationMethod(), + command.payloads() + )); + }), + 0, + commandPollingInterval, + MILLISECONDS); + } } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java index 8ae60a3..212c621 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java @@ -42,6 +42,7 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.servicecomb.saga.common.EventType; @@ -140,6 +141,17 @@ public class TxConsistentServiceTest { .filter(command -> command.globalTxId().equals(globalTxId) && !DONE.name().equals(command.status())) .collect(Collectors.toList()); } + + @Override + public List<Command> findFirstCommandToCompensate() { + List<Command> results = new ArrayList<>(1); + commands.stream() + .filter(command -> !DONE.name().equals(command.status())) + .findFirst() + .ifPresent(results::add); + + return results; + } }; private final String globalTxId = UUID.randomUUID().toString(); @@ -151,10 +163,18 @@ public class TxConsistentServiceTest { private final String compensationMethod = getClass().getCanonicalName(); private final List<CompensationContext> compensationContexts = new ArrayList<>(); - private final OmegaCallback omegaCallback = event -> - compensationContexts.add(new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads())); + private Consumer<TxEvent> eventConsumer = event -> {}; + private final OmegaCallback omegaCallback = event -> { + eventConsumer.accept(event); + compensationContexts.add( + new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads())); + }; - private final TxConsistentService consistentService = new TxConsistentService(eventRepository, commandRepository, omegaCallback); + private final TxConsistentService consistentService = new TxConsistentService( + eventRepository, + commandRepository, + omegaCallback, + 300); @Test public void persistEventOnArrival() throws Exception { @@ -175,6 +195,9 @@ public class TxConsistentServiceTest { @Test public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception { + eventConsumer = event -> consistentService + .handle(eventOf(TxCompensatedEvent, new byte[0], event.localTxId(), event.compensationMethod())); + String localTxId1 = UUID.randomUUID().toString(); events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1, "method a")); events.add(eventOf(TxEndedEvent, new byte[0], localTxId1, "method a")); @@ -193,12 +216,6 @@ public class TxConsistentServiceTest { new CompensationContext(globalTxId, localTxId2, "method b", "service b".getBytes()) )); - TxEvent compensateEvent2 = eventOf(TxCompensatedEvent, "service b".getBytes(), localTxId2, "method b"); - consistentService.handle(compensateEvent2); - - TxEvent compensateEvent1 = eventOf(TxCompensatedEvent, "service a".getBytes(), localTxId1, "method a"); - consistentService.handle(compensateEvent1); - await().atMost(1, SECONDS).until(() -> events.size() == 8); assertThat(events.pollLast().type(), is(SagaEndedEvent.name())); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java index 00dfe27..9c34738 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java @@ -66,12 +66,17 @@ class AlphaConfig { @Bean TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port, + @Value("${alpha.command.pollingInterval:3000}") int commandPollingInterval, TxEventRepository eventRepository, CommandRepository commandRepository, OmegaCallback omegaCallback, Map<String, Map<String, OmegaCallback>> omegaCallbacks) { - TxConsistentService consistentService = new TxConsistentService(eventRepository, commandRepository, omegaCallback); + TxConsistentService consistentService = new TxConsistentService( + eventRepository, + commandRepository, + omegaCallback, + commandPollingInterval); ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks); new Thread(startable::start).start(); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java index 3eac681..6207002 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java @@ -21,7 +21,6 @@ import java.util.Date; import javax.persistence.Embedded; import javax.persistence.Entity; -import javax.persistence.GeneratedValue; import javax.persistence.Id; import javax.persistence.Version; @@ -31,7 +30,6 @@ import org.apache.servicecomb.saga.alpha.core.TxEvent; @Entity class CommandEntity { @Id - @GeneratedValue private long surrogateId; @Embedded @@ -50,4 +48,8 @@ class CommandEntity { lastModified = new Date(); command = new Command(event); } + + Command command() { + return command; + } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java index 4b7309e..9402486 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java @@ -20,7 +20,9 @@ package org.apache.servicecomb.saga.alpha.server; import java.util.List; import java.util.Optional; -import org.apache.servicecomb.saga.alpha.core.Command; +import javax.transaction.Transactional; + +import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.CrudRepository; @@ -29,6 +31,7 @@ import org.springframework.data.repository.query.Param; public interface CommandEntityRepository extends CrudRepository<CommandEntity, Long> { Optional<CommandEntity> findByCommandGlobalTxIdAndCommandLocalTxId(String globalTxId, String localTxId); + @Transactional @Modifying @Query("UPDATE org.apache.servicecomb.saga.alpha.server.CommandEntity c " + "SET c.command.status = :status " @@ -39,5 +42,12 @@ public interface CommandEntityRepository extends CrudRepository<CommandEntity, L @Param("globalTxId") String globalTxId, @Param("localTxId") String localTxId); - List<Command> findByCommandGlobalTxIdAndCommandStatus(String globalTxId, String status); + List<CommandEntity> findByCommandGlobalTxIdAndCommandStatus(String globalTxId, String status); + + @Query("FROM CommandEntity c " + + "WHERE id IN (" + + " SELECT MAX(id) FROM CommandEntity c1 WHERE c1.command.status <> 'DONE' GROUP BY c1.command.globalTxId" + + ") " + + "ORDER BY c.id ASC") + List<CommandEntity> findFirstGroupByCommandGlobalTxIdOrderByIdDesc(Pageable pageable); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java index 9281b7e..aac3c22 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java @@ -24,11 +24,15 @@ import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.servicecomb.saga.alpha.core.Command; import org.apache.servicecomb.saga.alpha.core.CommandRepository; +import org.springframework.data.domain.PageRequest; public class SpringCommandRepository implements CommandRepository { + private static final PageRequest SINGLE_COMMAND_REQUEST = new PageRequest(0, 1); + private final TxEventEnvelopeRepository eventRepository; private final CommandEntityRepository commandRepository; @@ -44,7 +48,7 @@ public class SpringCommandRepository implements CommandRepository { @Override public void saveCompensationCommand(String globalTxId, String localTxId) { - TxEventEnvelope startedEvent = eventRepository.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventType( + TxEventEnvelope startedEvent = eventRepository.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventTypeOrderByIdAsc( globalTxId, localTxId, TxStartedEvent.name()); @@ -73,6 +77,18 @@ public class SpringCommandRepository implements CommandRepository { @Override public List<Command> findUncompletedCommands(String globalTxId) { - return commandRepository.findByCommandGlobalTxIdAndCommandStatus(globalTxId, NEW.name()); + return commandRepository.findByCommandGlobalTxIdAndCommandStatus(globalTxId, NEW.name()) + .stream() + .map(CommandEntity::command) + .collect(Collectors.toList()); + } + + @Override + public List<Command> findFirstCommandToCompensate() { + return commandRepository + .findFirstGroupByCommandGlobalTxIdOrderByIdDesc(SINGLE_COMMAND_REQUEST) + .stream() + .map(CommandEntity::command) + .collect(Collectors.toList()); } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index fcb7c00..85ed954 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -63,6 +63,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { + " FROM TxEventEnvelope t2 " + " WHERE t2.event.globalTxId = ?1 " + " AND t2.event.localTxId = t.event.localTxId " - + " AND t2.event.type = 'TxCompensatedEvent')") + + " AND t2.event.type = 'TxCompensatedEvent')" + + "ORDER BY t.id ASC ") List<TxEventEnvelope> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId); } diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index 6d4f91f..1514869 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -24,7 +24,7 @@ import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import static org.awaitility.Awaitility.await; -import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; @@ -34,11 +34,12 @@ import java.util.Map; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Consumer; -import org.apache.servicecomb.saga.common.EventType; import org.apache.servicecomb.saga.alpha.core.OmegaCallback; import org.apache.servicecomb.saga.alpha.core.TxConsistentService; import org.apache.servicecomb.saga.alpha.core.TxEvent; +import org.apache.servicecomb.saga.common.EventType; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig; @@ -63,7 +64,8 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; @RunWith(SpringRunner.class) -@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class}, properties = "alpha.server.port=8090") +@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class}, + properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=300"}) public class AlphaIntegrationTest { private static final int port = 8090; @@ -97,7 +99,7 @@ public class AlphaIntegrationTest { private TxConsistentService consistentService; private static final Queue<GrpcCompensateCommand> receivedCommands = new ConcurrentLinkedQueue<>(); - private final CompensateStreamObserver compensateResponseObserver = new CompensateStreamObserver(); + private final CompensateStreamObserver compensateResponseObserver = new CompensateStreamObserver(this::onCompensation); @AfterClass public static void tearDown() throws Exception { @@ -222,11 +224,11 @@ public class AlphaIntegrationTest { blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent)); await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1); - assertThat(receivedCommands, containsInAnyOrder( - GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId) - .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build(), + assertThat(receivedCommands, contains( GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1) - .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build() + .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(), + GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId) + .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build() )); } @@ -305,7 +307,7 @@ public class AlphaIntegrationTest { String anotherLocalTxId2 = UUID.randomUUID().toString(); blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId2)); - await().atMost(1, SECONDS).until(() -> eventRepo.count() == 6); + await().atMost(1, SECONDS).until(() -> eventRepo.count() == 7); blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId2)); await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty()); @@ -314,6 +316,15 @@ public class AlphaIntegrationTest { assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId)); } + private GrpcAck onCompensation(GrpcCompensateCommand command) { + return blockingStub.onTxEvent( + eventOf(TxCompensatedEvent, + command.getLocalTxId(), + command.getParentTxId(), + new byte[0], + command.getCompensateMethod())); + } + private GrpcServiceConfig someServiceConfig() { return GrpcServiceConfig.newBuilder() .setServiceName(uniquify("serviceName")) @@ -371,11 +382,21 @@ public class AlphaIntegrationTest { } private static class CompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> { + private final Consumer<GrpcCompensateCommand> consumer; private boolean completed = false; + private CompensateStreamObserver() { + this(command -> {}); + } + + private CompensateStreamObserver(Consumer<GrpcCompensateCommand> consumer) { + this.consumer = consumer; + } + @Override public void onNext(GrpcCompensateCommand command) { // intercept received command + consumer.accept(command); receivedCommands.add(command); } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
