This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit fff84355f893699b3e6903ac6ba362cf7960cf41 Author: seanyinx <[email protected]> AuthorDate: Fri Jan 19 10:28:22 2018 +0800 SCB-218 polling events into commands in the background Signed-off-by: seanyinx <[email protected]> --- .../saga/alpha/core/CommandRepository.java | 5 +- ...{TxConsistentService.java => EventScanner.java} | 139 +++++++------- .../saga/alpha/core/TxConsistentService.java | 102 +--------- .../saga/alpha/core/TxEventRepository.java | 5 +- .../saga/alpha/core/TxConsistentServiceTest.java | 208 ++------------------- .../servicecomb/saga/alpha/server/AlphaConfig.java | 12 +- .../saga/alpha/server/SpringCommandRepository.java | 25 +-- .../saga/alpha/server/SpringTxEventRepository.java | 10 +- .../alpha/server/TxEventEnvelopeRepository.java | 31 ++- .../saga/alpha/server/AlphaIntegrationTest.java | 12 +- 10 files changed, 126 insertions(+), 423 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java index 22f2b41..1da033f 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java @@ -20,11 +20,8 @@ package org.apache.servicecomb.saga.alpha.core; import java.util.List; public interface CommandRepository { - boolean exists(String globalTxId, String localTxId); - void saveCompensationCommand(String globalTxId, String localTxId); - - void saveCompensationCommands(String globalTxId); + Iterable<Command> saveCompensationCommands(String globalTxId); void markCommandAsDone(String globalTxId, String localTxId); diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java similarity index 50% copy from alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java copy to alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java index 61beea5..ea4aaf6 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java @@ -19,111 +19,116 @@ package org.apache.servicecomb.saga.alpha.core; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; -import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import java.lang.invoke.MethodHandles; import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TxConsistentService { +public class EventScanner implements Runnable { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {}; - private static final byte[] EMPTY_PAYLOAD = new byte[0]; + private final ScheduledExecutorService scheduler; private final TxEventRepository eventRepository; private final CommandRepository commandRepository; private final OmegaCallback omegaCallback; - private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{ - put(TxEndedEvent.name(), (event) -> compensateIfAlreadyAborted(event)); - put(TxAbortedEvent.name(), (event) -> compensate(event)); - put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event)); - }}; - - private final ExecutorService executor = Executors.newSingleThreadExecutor(); - private final ScheduledExecutorService scheduler; - - public TxConsistentService(TxEventRepository eventRepository, - CommandRepository commandRepository, - OmegaCallback omegaCallback, - int commandPollingInterval) { + private final int commandPollingInterval; + private final int eventPollingInterval; - this(eventRepository, commandRepository, omegaCallback, commandPollingInterval, Executors.newSingleThreadScheduledExecutor()); - } + private long nextEndedEventId; + private long nextCompensatedEventId; - public TxConsistentService(TxEventRepository eventRepository, + public EventScanner(ScheduledExecutorService scheduler, + TxEventRepository eventRepository, CommandRepository commandRepository, OmegaCallback omegaCallback, int commandPollingInterval, - ScheduledExecutorService scheduler) { + int eventPollingInterval) { + + this.scheduler = scheduler; this.eventRepository = eventRepository; this.commandRepository = commandRepository; this.omegaCallback = omegaCallback; - this.scheduler = scheduler; - - scheduleCompensationCommandPolling(commandPollingInterval); + this.commandPollingInterval = commandPollingInterval; + this.eventPollingInterval = eventPollingInterval; } - public boolean handle(TxEvent event) { - if (TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event)) { - return false; - } - - eventRepository.save(event); - - executor.execute(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event)); - return true; + @Override + public void run() { + pollCompensationCommand(commandPollingInterval); + pollEvents(); } - private void compensateIfAlreadyAborted(TxEvent event) { - if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) { - commandRepository.saveCompensationCommand(event.globalTxId(), event.localTxId()); - } + private void pollEvents() { + scheduler.scheduleWithFixedDelay( + () -> { + saveUncompensatedEventsToCommands(); + updateCompensatedCommands(); + }, + 0, + eventPollingInterval, + MILLISECONDS); } - private boolean isCompensationScheduled(TxEvent event) { - return commandRepository.exists(event.globalTxId(), event.localTxId()); + private void saveUncompensatedEventsToCommands() { + eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name()) + .forEach(event -> { + log.info("Found uncompensated event {}", event); + nextEndedEventId = event.id(); + commandRepository.saveCompensationCommands(event.globalTxId()) + .forEach(command -> nextEndedEventId = command.id()); + }); } - private void compensate(TxEvent event) { - commandRepository.saveCompensationCommands(event.globalTxId()); + private void updateCompensatedCommands() { + eventRepository.findFirstCompensatedEventByIdGreaterThan(nextCompensatedEventId, TxCompensatedEvent.name()) + .ifPresent(event -> { + log.info("Found compensated event {}", event); + nextCompensatedEventId = event.id(); + updateCompensationStatus(event); + }); } // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late, // unless we ask user to specify a name for each participant in the global TX in @Compensable - private void updateCompensateStatus(TxEvent event) { + private void updateCompensationStatus(TxEvent event) { commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId()); - log.info("Transaction with globalTxId {} and localTxId {} was compensated", event.globalTxId(), event.localTxId()); + log.info("Transaction with globalTxId {} and localTxId {} was compensated", + event.globalTxId(), + event.localTxId()); if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty() && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) { + markGlobalTxEnd(event); - log.info("Marked end of transaction with globalTxId {}", event.globalTxId()); } } private void markGlobalTxEnd(TxEvent event) { - eventRepository.save(new TxEvent( - event.serviceName(), event.instanceId(), new Date(), event.globalTxId(), event.globalTxId(), - null, SagaEndedEvent.name(), "", EMPTY_PAYLOAD)); + eventRepository.save(toSagaEndedEvent(event)); + log.info("Marked end of transaction with globalTxId {}", event.globalTxId()); } - private boolean isGlobalTxAborted(TxEvent event) { - return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty(); + private TxEvent toSagaEndedEvent(TxEvent event) { + return new TxEvent( + event.serviceName(), + event.instanceId(), + new Date(), + event.globalTxId(), + event.globalTxId(), + null, + SagaEndedEvent.name(), + "", + EMPTY_PAYLOAD); } - private void scheduleCompensationCommandPolling(int commandPollingInterval) { + private void pollCompensationCommand(int commandPollingInterval) { scheduler.scheduleWithFixedDelay( () -> commandRepository.findFirstCommandToCompensate() .forEach(command -> { @@ -131,19 +136,23 @@ public class TxConsistentService { command.globalTxId(), command.localTxId()); - omegaCallback.compensate(new TxEvent( - command.serviceName(), - command.instanceId(), - command.globalTxId(), - command.localTxId(), - command.parentTxId(), - TxStartedEvent.name(), - command.compensationMethod(), - command.payloads() - )); + omegaCallback.compensate(txStartedEventOf(command)); }), 0, commandPollingInterval, MILLISECONDS); } + + private TxEvent txStartedEventOf(Command command) { + return new TxEvent( + command.serviceName(), + command.instanceId(), + command.globalTxId(), + command.localTxId(), + command.parentTxId(), + TxStartedEvent.name(), + command.compensationMethod(), + command.payloads() + ); + } } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java index 61beea5..c55090a 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java @@ -17,133 +17,35 @@ package org.apache.servicecomb.saga.alpha.core; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; -import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; -import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import java.lang.invoke.MethodHandles; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TxConsistentService { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {}; - - private static final byte[] EMPTY_PAYLOAD = new byte[0]; private final TxEventRepository eventRepository; - private final CommandRepository commandRepository; - private final OmegaCallback omegaCallback; - private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{ - put(TxEndedEvent.name(), (event) -> compensateIfAlreadyAborted(event)); - put(TxAbortedEvent.name(), (event) -> compensate(event)); - put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event)); - }}; - - private final ExecutorService executor = Executors.newSingleThreadExecutor(); - private final ScheduledExecutorService scheduler; - - public TxConsistentService(TxEventRepository eventRepository, - CommandRepository commandRepository, - OmegaCallback omegaCallback, - int commandPollingInterval) { - this(eventRepository, commandRepository, omegaCallback, commandPollingInterval, Executors.newSingleThreadScheduledExecutor()); - } - - public TxConsistentService(TxEventRepository eventRepository, - CommandRepository commandRepository, - OmegaCallback omegaCallback, - int commandPollingInterval, - ScheduledExecutorService scheduler) { + public TxConsistentService(TxEventRepository eventRepository) { this.eventRepository = eventRepository; - this.commandRepository = commandRepository; - this.omegaCallback = omegaCallback; - this.scheduler = scheduler; - - scheduleCompensationCommandPolling(commandPollingInterval); } public boolean handle(TxEvent event) { if (TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event)) { + log.info("Sub-transaction rejected, because its parent with globalTxId {} was already aborted", event.globalTxId()); return false; } eventRepository.save(event); - executor.execute(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event)); return true; } - private void compensateIfAlreadyAborted(TxEvent event) { - if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) { - commandRepository.saveCompensationCommand(event.globalTxId(), event.localTxId()); - } - } - - private boolean isCompensationScheduled(TxEvent event) { - return commandRepository.exists(event.globalTxId(), event.localTxId()); - } - - private void compensate(TxEvent event) { - commandRepository.saveCompensationCommands(event.globalTxId()); - } - - // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late, - // unless we ask user to specify a name for each participant in the global TX in @Compensable - private void updateCompensateStatus(TxEvent event) { - commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId()); - log.info("Transaction with globalTxId {} and localTxId {} was compensated", event.globalTxId(), event.localTxId()); - - if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty() - && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) { - markGlobalTxEnd(event); - log.info("Marked end of transaction with globalTxId {}", event.globalTxId()); - } - } - - private void markGlobalTxEnd(TxEvent event) { - eventRepository.save(new TxEvent( - event.serviceName(), event.instanceId(), new Date(), event.globalTxId(), event.globalTxId(), - null, SagaEndedEvent.name(), "", EMPTY_PAYLOAD)); - } - private boolean isGlobalTxAborted(TxEvent event) { return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty(); } - - private void scheduleCompensationCommandPolling(int commandPollingInterval) { - scheduler.scheduleWithFixedDelay( - () -> commandRepository.findFirstCommandToCompensate() - .forEach(command -> { - log.info("Compensating transaction with globalTxId {} and localTxId {}", - command.globalTxId(), - command.localTxId()); - - omegaCallback.compensate(new TxEvent( - command.serviceName(), - command.instanceId(), - command.globalTxId(), - command.localTxId(), - command.parentTxId(), - TxStartedEvent.name(), - command.compensationMethod(), - command.payloads() - )); - }), - 0, - commandPollingInterval, - MILLISECONDS); - } } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java index cf5706b..d793de2 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java @@ -18,13 +18,14 @@ package org.apache.servicecomb.saga.alpha.core; import java.util.List; +import java.util.Optional; public interface TxEventRepository { void save(TxEvent event); List<TxEvent> findTransactions(String globalTxId, String type); - TxEvent findFirstTransaction(String globalTxId, String localTxId, String type); + List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type); - List<TxEvent> findTransactionsToCompensate(String globalTxId); + Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type); } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java index 212c621..473501e 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java @@ -18,31 +18,23 @@ package org.apache.servicecomb.saga.alpha.core; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE; +import static java.util.Collections.emptyList; import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; -import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Date; import java.util.Deque; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.servicecomb.saga.common.EventType; @@ -64,93 +56,13 @@ public class TxConsistentServiceTest { } @Override - public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) { - return events.stream() - .filter(event -> globalTxId.equals(event.globalTxId()) && localTxId.equals(event.localTxId()) && type.equals(event.type())) - .findFirst() - .get(); - } - - @Override - public List<TxEvent> findTransactionsToCompensate(String globalTxId) { - return events.stream() - .filter(event -> globalTxId.equals(event.globalTxId()) - && event.type().equals(TxStartedEvent.name()) - && isCompleted(globalTxId, event) - && !isCompensated(globalTxId, event)) - .collect(Collectors.toList()); - } - - private boolean isCompleted(String globalTxId, TxEvent event) { - return events.stream() - .filter(e -> globalTxId.equals(e.globalTxId()) - && e.localTxId().equals(event.localTxId()) - && e.type().equals(TxEndedEvent.name())) - .count() > 0; - } - - private boolean isCompensated(String globalTxId, TxEvent event) { - return events.stream() - .filter(e -> globalTxId.equals(e.globalTxId()) - && e.localTxId().equals(event.localTxId()) - && e.type().equals(TxCompensatedEvent.name())) - .count() > 0; - } - }; - - private final List<Command> commands = new ArrayList<>(); - private final CommandRepository commandRepository = new CommandRepository() { - @Override - public boolean exists(String globalTxId, String localTxId) { - return commands.stream() - .anyMatch(command -> globalTxId.equals(command.globalTxId()) && localTxId.equals(command.localTxId())); - } - - @Override - public void saveCompensationCommand(String globalTxId, String localTxId) { - TxEvent event = eventRepository.findFirstTransaction(globalTxId, localTxId, TxStartedEvent.name()); - commands.add(new Command(event)); + public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) { + return emptyList(); } @Override - public void saveCompensationCommands(String globalTxId) { - List<TxEvent> events = eventRepository.findTransactionsToCompensate(globalTxId); - - Map<String, Command> commandMap = new HashMap<>(); - - for (TxEvent event : events) { - commandMap.computeIfAbsent(event.localTxId(), k -> new Command(event)); - } - - commands.addAll(commandMap.values()); - } - - @Override - public void markCommandAsDone(String globalTxId, String localTxId) { - for (int i = 0; i < commands.size(); i++) { - Command command = commands.get(i); - if (globalTxId.equals(command.globalTxId()) && localTxId.equals(command.localTxId())) { - commands.set(i, new Command(command, DONE)); - } - } - } - - @Override - public List<Command> findUncompletedCommands(String globalTxId) { - return commands.stream() - .filter(command -> command.globalTxId().equals(globalTxId) && !DONE.name().equals(command.status())) - .collect(Collectors.toList()); - } - - @Override - public List<Command> findFirstCommandToCompensate() { - List<Command> results = new ArrayList<>(1); - commands.stream() - .filter(command -> !DONE.name().equals(command.status())) - .findFirst() - .ifPresent(results::add); - - return results; + public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type) { + return Optional.empty(); } }; @@ -161,20 +73,9 @@ public class TxConsistentServiceTest { private final String instanceId = uniquify("instanceId"); private final String compensationMethod = getClass().getCanonicalName(); - private final List<CompensationContext> compensationContexts = new ArrayList<>(); - private Consumer<TxEvent> eventConsumer = event -> {}; - private final OmegaCallback omegaCallback = event -> { - eventConsumer.accept(event); - compensationContexts.add( - new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads())); - }; - - private final TxConsistentService consistentService = new TxConsistentService( - eventRepository, - commandRepository, - omegaCallback, - 300); + private final TxConsistentService consistentService = new TxConsistentService(eventRepository); + private final byte[] payloads = "yeah".getBytes(); @Test public void persistEventOnArrival() throws Exception { @@ -190,49 +91,6 @@ public class TxConsistentServiceTest { } assertThat(this.events, contains(events)); - assertThat(compensationContexts.isEmpty(), is(true)); - } - - @Test - public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception { - eventConsumer = event -> consistentService - .handle(eventOf(TxCompensatedEvent, new byte[0], event.localTxId(), event.compensationMethod())); - - String localTxId1 = UUID.randomUUID().toString(); - events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1, "method a")); - events.add(eventOf(TxEndedEvent, new byte[0], localTxId1, "method a")); - - String localTxId2 = UUID.randomUUID().toString(); - events.add(eventOf(TxStartedEvent, "service b".getBytes(), localTxId2, "method b")); - events.add(eventOf(TxEndedEvent, new byte[0], localTxId2, "method b")); - - TxEvent abortEvent = newEvent(TxAbortedEvent); - - consistentService.handle(abortEvent); - - await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1); - assertThat(compensationContexts, containsInAnyOrder( - new CompensationContext(globalTxId, localTxId1, "method a", "service a".getBytes()), - new CompensationContext(globalTxId, localTxId2, "method b", "service b".getBytes()) - )); - - await().atMost(1, SECONDS).until(() -> events.size() == 8); - assertThat(events.pollLast().type(), is(SagaEndedEvent.name())); - } - - @Test - public void compensateTxEndedEventImmediately_IfGlobalTxAlreadyFailed() throws Exception { - events.add(newEvent(TxStartedEvent)); - events.add(newEvent(TxAbortedEvent)); - - TxEvent event = eventOf(TxEndedEvent, new byte[0], localTxId, compensationMethod); - - consistentService.handle(event); - - await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 0); - assertThat(compensationContexts, containsInAnyOrder( - new CompensationContext(globalTxId, localTxId, compensationMethod, "yeah".getBytes()) - )); } @Test @@ -241,7 +99,7 @@ public class TxConsistentServiceTest { events.add(newEvent(TxStartedEvent)); events.add(newEvent(TxAbortedEvent)); - TxEvent event = eventOf(TxStartedEvent, "service x".getBytes(), localTxId1, "method x"); + TxEvent event = eventOf(TxStartedEvent, localTxId1); consistentService.handle(event); @@ -249,10 +107,10 @@ public class TxConsistentServiceTest { } private TxEvent newEvent(EventType eventType) { - return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes()); + return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, payloads); } - private TxEvent eventOf(EventType eventType, byte[] payloads, String localTxId, String compensationMethod) { + private TxEvent eventOf(EventType eventType, String localTxId) { return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, @@ -261,48 +119,4 @@ public class TxConsistentServiceTest { compensationMethod, payloads); } - - private static class CompensationContext { - private final String globalTxId; - private final String localTxId; - private final String compensationMethod; - private final byte[] message; - - private CompensationContext(String globalTxId, String localTxId, String compensationMethod, byte[] message) { - this.globalTxId = globalTxId; - this.localTxId = localTxId; - this.compensationMethod = compensationMethod; - this.message = message; - } - - @Override - public String toString() { - return "CompensationContext{" + - "globalTxId='" + globalTxId + '\'' + - ", localTxId='" + localTxId + '\'' + - ", compensationMethod='" + compensationMethod + '\'' + - ", message=" + Arrays.toString(message) + - '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - CompensationContext that = (CompensationContext) o; - return Objects.equals(globalTxId, that.globalTxId) && - Objects.equals(localTxId, that.localTxId) && - Objects.equals(compensationMethod, that.compensationMethod) && - Arrays.equals(message, that.message); - } - - @Override - public int hashCode() { - return Objects.hash(globalTxId, localTxId, compensationMethod, message); - } - } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java index 3740581..73298be 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java @@ -28,6 +28,7 @@ import javax.annotation.PostConstruct; import org.apache.servicecomb.saga.alpha.core.CommandRepository; import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback; +import org.apache.servicecomb.saga.alpha.core.EventScanner; import org.apache.servicecomb.saga.alpha.core.OmegaCallback; import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner; import org.apache.servicecomb.saga.alpha.core.PushBackOmegaCallback; @@ -68,24 +69,27 @@ class AlphaConfig { @Bean ScheduledExecutorService compensationScheduler() { - return Executors.newSingleThreadScheduledExecutor(); + return Executors.newScheduledThreadPool(2); } @Bean TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port, - @Value("${alpha.command.pollingInterval:3000}") int commandPollingInterval, + @Value("${alpha.command.pollingInterval:500}") int commandPollingInterval, + @Value("${alpha.event.pollingInterval:500}") int eventPollingInterval, ScheduledExecutorService scheduler, TxEventRepository eventRepository, CommandRepository commandRepository, OmegaCallback omegaCallback, Map<String, Map<String, OmegaCallback>> omegaCallbacks) { - TxConsistentService consistentService = new TxConsistentService( + new EventScanner(scheduler, eventRepository, commandRepository, omegaCallback, commandPollingInterval, - scheduler); + eventPollingInterval).run(); + + TxConsistentService consistentService = new TxConsistentService(eventRepository); ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks); new Thread(startable::start).start(); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java index 34b43a4..6076e54 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java @@ -20,8 +20,8 @@ package org.apache.servicecomb.saga.alpha.server; import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE; import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW; import static org.apache.servicecomb.saga.alpha.core.CommandStatus.PENDING; -import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; +import java.lang.invoke.MethodHandles; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -29,9 +29,12 @@ import java.util.Map; import org.apache.servicecomb.saga.alpha.core.Command; import org.apache.servicecomb.saga.alpha.core.CommandRepository; import org.apache.servicecomb.saga.alpha.core.TxEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.data.domain.PageRequest; public class SpringCommandRepository implements CommandRepository { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final PageRequest SINGLE_COMMAND_REQUEST = new PageRequest(0, 1); private final TxEventEnvelopeRepository eventRepository; @@ -43,22 +46,7 @@ public class SpringCommandRepository implements CommandRepository { } @Override - public boolean exists(String globalTxId, String localTxId) { - return commandRepository.findByGlobalTxIdAndLocalTxId(globalTxId, localTxId).isPresent(); - } - - @Override - public void saveCompensationCommand(String globalTxId, String localTxId) { - TxEvent startedEvent = eventRepository.findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc( - globalTxId, - localTxId, - TxStartedEvent.name()); - - commandRepository.save(new Command(startedEvent)); - } - - @Override - public void saveCompensationCommands(String globalTxId) { + public Iterable<Command> saveCompensationCommands(String globalTxId) { List<TxEvent> events = eventRepository .findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId); @@ -68,7 +56,8 @@ public class SpringCommandRepository implements CommandRepository { commands.computeIfAbsent(event.localTxId(), k -> new Command(event)); } - commandRepository.save(commands.values()); + log.info("Saving compensation commands {}", commands.values()); + return commandRepository.save(commands.values()); } @Override diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java index 7c44639..4108aa5 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java @@ -18,9 +18,11 @@ package org.apache.servicecomb.saga.alpha.server; import java.util.List; +import java.util.Optional; import org.apache.servicecomb.saga.alpha.core.TxEvent; import org.apache.servicecomb.saga.alpha.core.TxEventRepository; +import org.springframework.data.domain.PageRequest; class SpringTxEventRepository implements TxEventRepository { private final TxEventEnvelopeRepository eventRepo; @@ -40,12 +42,12 @@ class SpringTxEventRepository implements TxEventRepository { } @Override - public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) { - return eventRepo.findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc(globalTxId, localTxId, type); + public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) { + return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, new PageRequest(0, 1)); } @Override - public List<TxEvent> findTransactionsToCompensate(String globalTxId) { - return eventRepo.findStartedEventsWithMatchingEndedButNotCompensatedEvents(globalTxId); + public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type) { + return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id); } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index 09aec6f..78c2a1d 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -18,8 +18,10 @@ package org.apache.servicecomb.saga.alpha.server; import java.util.List; +import java.util.Optional; import org.apache.servicecomb.saga.alpha.core.TxEvent; +import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.CrudRepository; @@ -32,13 +34,8 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { + "WHERE t.globalTxId = ?1 AND t.type = ?2") List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type); - TxEvent findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc(String globalTxId, String localTxId, String type); - - @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent(" - + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads" - + ") FROM TxEvent t " - + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent'" - + "AND EXISTS (" + @Query("SELECT t FROM TxEvent t " + + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent' AND EXISTS ( " + " SELECT t1.globalTxId" + " FROM TxEvent t1 " + " WHERE t1.globalTxId = ?1 " @@ -50,19 +47,15 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { + " WHERE t2.globalTxId = ?1 " + " AND t2.localTxId = t.localTxId " + " AND t2.type = 'TxCompensatedEvent') " - + "ORDER BY t.surrogateId ASC" - ) - List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId); + + "ORDER BY t.surrogateId ASC") + List<TxEvent> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId); - @Query("SELECT new org.apache.servicecomb.saga.alpha.core.TxEvent(" - + "t.surrogateId, t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads" - + ") FROM TxEvent t " - + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent' AND EXISTS ( " + @Query("SELECT t FROM TxEvent t " + + "WHERE t.type = 'TxEndedEvent' AND t.surrogateId > ?2 AND EXISTS ( " + " SELECT t1.globalTxId" + " FROM TxEvent t1 " - + " WHERE t1.globalTxId = ?1 " - + " AND t1.localTxId = t.localTxId " - + " AND t1.type = 'TxEndedEvent'" + + " WHERE t1.globalTxId = t.globalTxId " + + " AND t1.type = 'TxAbortedEvent'" + ") AND NOT EXISTS ( " + " SELECT t2.globalTxId" + " FROM TxEvent t2 " @@ -70,5 +63,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { + " AND t2.localTxId = t.localTxId " + " AND t2.type = 'TxCompensatedEvent') " + "ORDER BY t.surrogateId ASC") - List<TxEvent> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId); + List<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId, Pageable pageable); + + Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId); } diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index 1bfbe3c..5cff57c 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -34,8 +34,6 @@ import java.util.Map; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; import org.apache.servicecomb.saga.alpha.core.OmegaCallback; @@ -57,8 +55,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; import org.springframework.test.context.junit4.SpringRunner; import com.google.protobuf.ByteString; @@ -69,7 +65,7 @@ import io.grpc.stub.StreamObserver; @RunWith(SpringRunner.class) @SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class}, - properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1"}) + properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1", "alpha.event.pollingInterval=100"}) public class AlphaIntegrationTest { private static final int port = 8090; @@ -417,10 +413,4 @@ public class AlphaIntegrationTest { return completed; } } - - @Primary - @Bean - ScheduledExecutorService scheduler() { - return Executors.newScheduledThreadPool(2); - } } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
