This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit bc3879f204e41dc6bb9bc9f290a12eb524202f51 Author: Eric Lee <dagang...@huawei.com> AuthorDate: Mon Feb 5 23:02:29 2018 +0800 SCB-239 handle timeout in EventScanner Signed-off-by: Eric Lee <dagang...@huawei.com> --- .../servicecomb/saga/alpha/core/EventScanner.java | 58 +++++++++---- .../saga/alpha/core/TxConsistentService.java | 35 +------- .../servicecomb/saga/alpha/core/TxEvent.java | 39 ++++++--- .../saga/alpha/core/TxEventRepository.java | 4 + .../servicecomb/saga/alpha/core/TxTimeout.java | 50 ++++++++--- .../saga/alpha/core/TxTimeoutRepository.java | 6 +- .../saga/alpha/core/TxConsistentServiceTest.java | 81 ++++-------------- .../servicecomb/saga/alpha/server/AlphaConfig.java | 2 +- .../saga/alpha/server/SpringTxEventRepository.java | 10 +++ .../alpha/server/SpringTxTimeoutRepository.java | 26 ++---- .../alpha/server/TxEventEnvelopeRepository.java | 16 ++++ .../alpha/server/TxTimeoutEntityRepository.java | 36 ++++---- .../src/main/resources/schema-postgresql.sql | 13 ++- .../saga/alpha/server/AlphaIntegrationTest.java | 7 +- alpha/alpha-server/src/test/resources/schema.sql | 11 ++- .../omega/transaction/OnceAwareInterceptor.java | 49 ----------- .../saga/omega/transaction/TransactionAspect.java | 1 - .../transaction/OnceAwareInterceptorTest.java | 98 ---------------------- 18 files changed, 202 insertions(+), 340 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java index 4f72a1c..a52ebe5 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java @@ -18,6 +18,7 @@ package org.apache.servicecomb.saga.alpha.core; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW; import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; @@ -66,7 +67,9 @@ public class EventScanner implements Runnable { private void pollEvents() { scheduler.scheduleWithFixedDelay( () -> { - abortTimeoutEvent(); + updateTimeoutStatus(); + findTimeoutEvents(); + abortTimeoutEvents(); saveUncompensatedEventsToCommands(); compensate(); updateCompensatedCommands(); @@ -78,6 +81,18 @@ public class EventScanner implements Runnable { MILLISECONDS); } + private void findTimeoutEvents() { + eventRepository.findTimeoutEvents() + .forEach(event -> { + log.info("Found timeout event {}", event); + timeoutRepository.save(txTimeoutOf(event)); + }); + } + + private void updateTimeoutStatus() { + timeoutRepository.markTimeoutAsDone(); + } + private void saveUncompensatedEventsToCommands() { eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name()) .forEach(event -> { @@ -113,15 +128,15 @@ public class EventScanner implements Runnable { markSagaEnded(event); } - private void abortTimeoutEvent() { - timeoutRepository.findFirstTimeoutTxToAbort().forEach(event -> { - log.info("Found timeout event {}", event); + private void abortTimeoutEvents() { + timeoutRepository.findFirstTimeout().forEach(timeout -> { + log.info("Found timeout event {} to abort", timeout); - eventRepository.save(toTxAbortedEvent(event)); - timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), event.localTxId()); + eventRepository.save(toTxAbortedEvent(timeout)); - if (event.type().equals(TxStartedEvent.name())) { - omegaCallback.compensate(event); + if (timeout.type().equals(TxStartedEvent.name())) { + eventRepository.findTxStartedEventToCompensate(timeout.globalTxId(), timeout.localTxId()) + .ifPresent(omegaCallback::compensate); } }); } @@ -138,17 +153,16 @@ public class EventScanner implements Runnable { private void markGlobalTxEnd(TxEvent event) { eventRepository.save(toSagaEndedEvent(event)); - timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), event.localTxId()); log.info("Marked end of transaction with globalTxId {}", event.globalTxId()); } - private TxEvent toTxAbortedEvent(TxEvent event) { + private TxEvent toTxAbortedEvent(TxTimeout timeout) { return new TxEvent( - event.serviceName(), - event.instanceId(), - event.globalTxId(), - event.localTxId(), - event.parentTxId(), + timeout.serviceName(), + timeout.instanceId(), + timeout.globalTxId(), + timeout.localTxId(), + timeout.parentTxId(), TxAbortedEvent.name(), "", ("Transaction timeout").getBytes()); @@ -189,4 +203,18 @@ public class EventScanner implements Runnable { command.payloads() ); } + + private TxTimeout txTimeoutOf(TxEvent event) { + return new TxTimeout( + event.id(), + event.serviceName(), + event.instanceId(), + event.globalTxId(), + event.localTxId(), + event.parentTxId(), + event.type(), + event.expiryTime(), + NEW.name() + ); + } } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java index 541d54f..c55090a 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java @@ -17,18 +17,10 @@ package org.apache.servicecomb.saga.alpha.core; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW; -import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; -import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; -import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import java.lang.invoke.MethodHandles; -import java.util.Arrays; -import java.util.Date; -import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,12 +29,9 @@ public class TxConsistentService { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final TxEventRepository eventRepository; - private final TxTimeoutRepository timeoutRepository; - public TxConsistentService(TxEventRepository eventRepository, - TxTimeoutRepository timeoutRepository) { + public TxConsistentService(TxEventRepository eventRepository) { this.eventRepository = eventRepository; - this.timeoutRepository = timeoutRepository; } public boolean handle(TxEvent event) { @@ -51,33 +40,11 @@ public class TxConsistentService { return false; } - if (isEventWithTimeout(event)) { - saveTxTimeout(event); - } - eventRepository.save(event); - if (Arrays.asList(TxEndedEvent.name(), SagaEndedEvent.name(), TxAbortedEvent.name()).contains(event.type())) { - CompletableFuture.runAsync(() -> timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), event.localTxId())); - } - return true; } - private boolean isEventWithTimeout(TxEvent event) { - return Arrays.asList(TxStartedEvent.name(), SagaStartedEvent.name()).contains(event.type()) && event.timeout() != 0; - } - - private void saveTxTimeout(TxEvent event) { - Date expireTime = new Date(event.creationTime().getTime() + SECONDS.toMillis(event.timeout())); - timeoutRepository.save( - new TxTimeout( - event.globalTxId(), - event.localTxId(), - expireTime, - NEW.name())); - } - private boolean isGlobalTxAborted(TxEvent event) { return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty(); } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java index e34b7c6..42a202f 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java @@ -17,6 +17,8 @@ package org.apache.servicecomb.saga.alpha.core; +import static java.util.concurrent.TimeUnit.SECONDS; + import java.util.Date; import javax.persistence.Entity; @@ -24,10 +26,12 @@ import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.Transient; -import javax.persistence.Version; @Entity public class TxEvent { + @Transient + private static final long MAX_TIMESTAMP = 253402214400000L; // 9999-12-31 00:00:00 + @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long surrogateId; @@ -40,14 +44,9 @@ public class TxEvent { private String parentTxId; private String type; private String compensationMethod; + private Date expiryTime; private byte[] payloads; - @Version - private long version; - - @Transient - private int timeout; - private TxEvent() { } @@ -61,7 +60,7 @@ public class TxEvent { event.parentTxId, event.type, event.compensationMethod, - 0, + event.expiryTime, event.payloads); } @@ -117,7 +116,23 @@ public class TxEvent { String compensationMethod, int timeout, byte[] payloads) { + this(surrogateId, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, + compensationMethod, + timeout == 0 ? new Date(MAX_TIMESTAMP) : new Date(creationTime.getTime() + SECONDS.toMillis(timeout)), + payloads); + } + TxEvent(Long surrogateId, + String serviceName, + String instanceId, + Date creationTime, + String globalTxId, + String localTxId, + String parentTxId, + String type, + String compensationMethod, + Date expiryTime, + byte[] payloads) { this.surrogateId = surrogateId; this.serviceName = serviceName; this.instanceId = instanceId; @@ -127,8 +142,8 @@ public class TxEvent { this.parentTxId = parentTxId; this.type = type; this.compensationMethod = compensationMethod; + this.expiryTime = expiryTime; this.payloads = payloads; - this.timeout = timeout; } public String serviceName() { @@ -171,8 +186,8 @@ public class TxEvent { return surrogateId; } - public int timeout() { - return timeout; + public Date expiryTime() { + return expiryTime; } @Override @@ -187,7 +202,7 @@ public class TxEvent { ", parentTxId='" + parentTxId + '\'' + ", type='" + type + '\'' + ", compensationMethod='" + compensationMethod + '\'' + - ", timeout='" + timeout + '\'' + + ", expiryTime='" + expiryTime + '\'' + '}'; } } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java index b974bd9..0af6fb5 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java @@ -25,6 +25,10 @@ public interface TxEventRepository { Optional<TxEvent> findFirstAbortedGlobalTransaction(); + List<TxEvent> findTimeoutEvents(); + + Optional<TxEvent> findTxStartedEventToCompensate(String globalTxId, String localTxId); + List<TxEvent> findTransactions(String globalTxId, String type); List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type); diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java index dc365e3..00ca2ec 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java @@ -31,9 +31,14 @@ public class TxTimeout { @GeneratedValue(strategy = GenerationType.IDENTITY) private Long surrogateId; + private long eventId; + private String serviceName; + private String instanceId; private String globalTxId; private String localTxId; - private Date expireTime; + private String parentTxId; + private String type; + private Date expiryTime; private String status; @Version @@ -42,13 +47,27 @@ public class TxTimeout { TxTimeout() { } - public TxTimeout(String globalTxId, String localTxId, Date expireTime, String status) { + TxTimeout(long eventId, String serviceName, String instanceId, String globalTxId, String localTxId, + String parentTxId, String type, Date expiryTime, String status) { + this.eventId = eventId; + this.serviceName = serviceName; + this.instanceId = instanceId; this.globalTxId = globalTxId; this.localTxId = localTxId; - this.expireTime = expireTime; + this.parentTxId = parentTxId; + this.type = type; + this.expiryTime = expiryTime; this.status = status; } + public String serviceName() { + return serviceName; + } + + public String instanceId() { + return instanceId; + } + public String globalTxId() { return globalTxId; } @@ -57,24 +76,33 @@ public class TxTimeout { return localTxId; } - public Date expireTime() { - return expireTime; + public String parentTxId() { + return parentTxId; } - public String status() { - return status; + public String type() { + return type; } - public void setStatus(String status) { - this.status = status; + public Date expiryTime() { + return expiryTime; + } + + public String status() { + return status; } @Override public String toString() { return "TxTimeout{" + - "globalTxId='" + globalTxId + '\'' + + "eventId=" + eventId + + ", serviceName='" + serviceName + '\'' + + ", instanceId='" + instanceId + '\'' + + ", globalTxId='" + globalTxId + '\'' + ", localTxId='" + localTxId + '\'' + - ", expireTime=" + expireTime + + ", parentTxId='" + parentTxId + '\'' + + ", type='" + type + '\'' + + ", expiryTime=" + expiryTime + ", status=" + status + '}'; } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java index 88758c7..97387a3 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java @@ -20,9 +20,9 @@ package org.apache.servicecomb.saga.alpha.core; import java.util.List; public interface TxTimeoutRepository { - void save(TxTimeout event); + void save(TxTimeout timeout); - void markTxTimeoutAsDone(String globalTxId, String localTxId); + void markTimeoutAsDone(); - List<TxEvent> findFirstTimeoutTxToAbort(); + List<TxTimeout> findFirstTimeout(); } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java index 5467368..d220994 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java @@ -19,15 +19,12 @@ package org.apache.servicecomb.saga.alpha.core; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static java.util.Collections.emptyList; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE; import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; -import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.contains; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; @@ -57,6 +54,18 @@ public class TxConsistentServiceTest { } @Override + public List<TxEvent> findTimeoutEvents() { + return emptyList(); + } + + @Override + public Optional<TxEvent> findTxStartedEventToCompensate(String globalTxId, String localTxId) { + return events.stream() + .filter(event -> globalTxId.equals(event.globalTxId()) && localTxId.equals(event.localTxId())) + .findFirst(); + } + + @Override public List<TxEvent> findTransactions(String globalTxId, String type) { return events.stream() .filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type())) @@ -78,29 +87,6 @@ public class TxConsistentServiceTest { } }; - private final Deque<TxTimeout> timeouts = new ConcurrentLinkedDeque<>(); - private final TxTimeoutRepository timeoutRepository = new TxTimeoutRepository() { - @Override - public void save(TxTimeout timeout) { - timeouts.add(timeout); - } - - @Override - public void markTxTimeoutAsDone(String globalTxId, String localTxId) { - for (TxTimeout timeout : timeouts) { - if (timeout.globalTxId().equals(globalTxId) && timeout.localTxId().equals(localTxId)) { - timeout.setStatus(DONE.name()); - break; - } - } - } - - @Override - public List<TxEvent> findFirstTimeoutTxToAbort() { - return null; - } - }; - private final String globalTxId = UUID.randomUUID().toString(); private final String localTxId = UUID.randomUUID().toString(); private final String parentTxId = UUID.randomUUID().toString(); @@ -109,13 +95,12 @@ public class TxConsistentServiceTest { private final String compensationMethod = getClass().getCanonicalName(); - private final TxConsistentService consistentService = new TxConsistentService(eventRepository, timeoutRepository); + private final TxConsistentService consistentService = new TxConsistentService(eventRepository); private final byte[] payloads = "yeah".getBytes(); @Before public void setUp() throws Exception { events.clear(); - timeouts.clear(); } @Test @@ -132,7 +117,6 @@ public class TxConsistentServiceTest { } assertThat(this.events, contains(events)); - assertThat(timeouts.isEmpty(), is(true)); } @Test @@ -146,46 +130,11 @@ public class TxConsistentServiceTest { consistentService.handle(event); assertThat(events.size(), is(2)); - assertThat(timeouts.isEmpty(), is(true)); - } - - @Test - public void persistTimeoutEventOnArrival() { - TxEvent[] events = { - newEventWithTimeout(SagaStartedEvent, globalTxId,2), - newEventWithTimeout(TxStartedEvent, 1), - newEvent(TxEndedEvent), - newEvent(TxCompensatedEvent), - eventOf(SagaEndedEvent, globalTxId)}; - - for (TxEvent event : events) { - consistentService.handle(event); - } - - assertThat(this.events, contains(events)); - assertThat(timeouts.size(), is(2)); - await().atMost(1, SECONDS).until(this::allTimeoutIsDone); - } - - private boolean allTimeoutIsDone() { - for (TxTimeout timeout : timeouts) { - if (!timeout.status().equals(DONE.name())) { - return false; - } - } - return true; } private TxEvent newEvent(EventType eventType) { - return newEventWithTimeout(eventType, 0); - } - - private TxEvent newEventWithTimeout(EventType eventType, int timeout) { - return newEventWithTimeout(eventType, localTxId, timeout); - } - - private TxEvent newEventWithTimeout(EventType eventType, String localTxId, int timeout) { - return new TxEvent(serviceName, instanceId, globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, timeout, payloads); + return new TxEvent(serviceName, instanceId, globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, + payloads); } private TxEvent eventOf(EventType eventType, String localTxId) { diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java index 9472d0d..6889c9f 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java @@ -94,7 +94,7 @@ class AlphaConfig { eventRepository, commandRepository, timeoutRepository, omegaCallback, eventPollingInterval).run(); - TxConsistentService consistentService = new TxConsistentService(eventRepository, timeoutRepository); + TxConsistentService consistentService = new TxConsistentService(eventRepository); ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks); new Thread(startable::start).start(); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java index 5531d8f..d6ea21c 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java @@ -43,6 +43,16 @@ class SpringTxEventRepository implements TxEventRepository { } @Override + public List<TxEvent> findTimeoutEvents() { + return eventRepo.findTimeoutEvents(SINGLE_TX_EVENT_REQUEST); + } + + @Override + public Optional<TxEvent> findTxStartedEventToCompensate(String globalTxId, String localTxId) { + return eventRepo.findFirstStartedEventByGlobalTxIdAndLocalTxId(globalTxId, localTxId); + } + + @Override public List<TxEvent> findTransactions(String globalTxId, String type) { return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java index 71c808d..ee75496 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java @@ -17,15 +17,12 @@ package org.apache.servicecomb.saga.alpha.server; -import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE; import static org.apache.servicecomb.saga.alpha.core.TaskStatus.PENDING; -import java.util.ArrayList; import java.util.List; import javax.transaction.Transactional; -import org.apache.servicecomb.saga.alpha.core.TxEvent; import org.apache.servicecomb.saga.alpha.core.TxTimeout; import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository; import org.springframework.data.domain.PageRequest; @@ -38,26 +35,21 @@ public class SpringTxTimeoutRepository implements TxTimeoutRepository { } @Override - public void save(TxTimeout event) { - timeoutRepo.save(event); + public void save(TxTimeout timeout) { + timeoutRepo.save(timeout); } @Override - public void markTxTimeoutAsDone(String globalTxId, String localTxId) { - timeoutRepo.updateStatusByGlobalTxIdAndLocalTxId(DONE.name(), globalTxId, localTxId); + public void markTimeoutAsDone() { + timeoutRepo.updateStatusOfFinishedTx(); } @Transactional @Override - public List<TxEvent> findFirstTimeoutTxToAbort() { - List<TxEvent> timeoutEvents = timeoutRepo.findFirstTimeoutTxOrderByExpireTimeAsc(new PageRequest(0, 1)); - List<TxEvent> pendingTimeoutEvents = new ArrayList<>(); - timeoutEvents.forEach(event -> { - if (timeoutRepo.updateStatusFromNewByGlobalTxIdAndLocalTxId(PENDING.name(), event.globalTxId(), event.localTxId()) - != 0) { - pendingTimeoutEvents.add(event); - } - }); - return pendingTimeoutEvents; + public List<TxTimeout> findFirstTimeout() { + List<TxTimeout> timeoutEvents = timeoutRepo.findFirstTimeoutTxOrderByExpireTimeAsc(new PageRequest(0, 1)); + timeoutEvents.forEach(event -> timeoutRepo + .updateStatusByGlobalTxIdAndLocalTxId(PENDING.name(), event.globalTxId(), event.localTxId())); + return timeoutEvents; } } \ No newline at end of file diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index c4984f9..0eaf089 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -38,6 +38,22 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { + " AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent'))") Optional<TxEvent> findFirstAbortedGlobalTxByType(); + @Query("SELECT t FROM TxEvent t " + + "WHERE t.type IN ('TxStartedEvent', 'SagaStartedEvent') " + + " AND t.expiryTime < CURRENT_TIMESTAMP AND NOT EXISTS( " + + " SELECT t1.globalTxId FROM TxEvent t1 " + + " WHERE t1.globalTxId = t.globalTxId " + + " AND t1.localTxId = t.localTxId " + + " AND t1.type != t.type" + + ")") + List<TxEvent> findTimeoutEvents(Pageable pageable); + + @Query("SELECT t FROM TxEvent t " + + "WHERE t.globalTxId = ?1 " + + " AND t.localTxId = ?2 " + + " AND t.type = 'TxStartedEvent'") + Optional<TxEvent> findFirstStartedEventByGlobalTxIdAndLocalTxId(String globalTxId, String localTxId); + @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent(" + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, " + "t.type, t.compensationMethod, t.payloads" diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java index cc39397..f0e264a 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java @@ -22,7 +22,6 @@ import java.util.List; import javax.persistence.LockModeType; import javax.transaction.Transactional; -import org.apache.servicecomb.saga.alpha.core.TxEvent; import org.apache.servicecomb.saga.alpha.core.TxTimeout; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.Lock; @@ -44,25 +43,22 @@ interface TxTimeoutEntityRepository extends CrudRepository<TxTimeout, Long> { @Param("globalTxId") String globalTxId, @Param("localTxId") String localTxId); + @Lock(LockModeType.OPTIMISTIC) + @Query("SELECT t FROM TxTimeout AS t " + + "WHERE t.status = 'NEW' " + + " AND t.expiryTime < CURRENT_TIMESTAMP " + + "ORDER BY t.expiryTime ASC") + List<TxTimeout> findFirstTimeoutTxOrderByExpireTimeAsc(Pageable pageable); + @Transactional @Modifying(clearAutomatically = true) - @Query("UPDATE org.apache.servicecomb.saga.alpha.core.TxTimeout t " - + "SET t.status = :status " - + "WHERE t.globalTxId = :globalTxId " - + " AND t.localTxId = :localTxId " - + " AND t.status = 'NEW'") - int updateStatusFromNewByGlobalTxIdAndLocalTxId( - @Param("status") String status, - @Param("globalTxId") String globalTxId, - @Param("localTxId") String localTxId); - - @Lock(LockModeType.OPTIMISTIC) - @Query("SELECT te FROM TxEvent AS te " - + "INNER JOIN TxTimeout AS tt " - + "ON te.globalTxId = tt.globalTxId " - + " AND te.localTxId = tt.localTxId " - + " AND tt.status = 'NEW' " - + " AND tt.expireTime < CURRENT_TIMESTAMP " - + "ORDER BY tt.expireTime ASC") - List<TxEvent> findFirstTimeoutTxOrderByExpireTimeAsc(Pageable pageable); + @Query("UPDATE TxTimeout t " + + "SET t.status = 'DONE' " + + "WHERE t.status != 'DONE' AND EXISTS (" + + " SELECT t1.globalTxId FROM TxEvent t1 " + + " WHERE t1.globalTxId = t.globalTxId " + + " AND t1.localTxId = t.localTxId " + + " AND t1.type != t.type" + + ")") + void updateStatusOfFinishedTx(); } diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql index 484c5e3..e7f774b 100644 --- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql +++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql @@ -8,8 +8,8 @@ CREATE TABLE IF NOT EXISTS TxEvent ( parentTxId varchar(36) DEFAULT NULL, type varchar(50) NOT NULL, compensationMethod varchar(256) NOT NULL, - payloads bytea, - version bigint NOT NULL + expiryTime timestamp(6) NOT NULL, + payloads bytea ); CREATE INDEX IF NOT EXISTS saga_events_index ON TxEvent (surrogateId, globalTxId, localTxId, type); @@ -35,11 +35,16 @@ CREATE INDEX IF NOT EXISTS saga_commands_index ON Command (surrogateId, eventId, CREATE TABLE IF NOT EXISTS TxTimeout ( surrogateId BIGSERIAL PRIMARY KEY, + eventId bigint NOT NULL UNIQUE, + serviceName varchar(16) NOT NULL, + instanceId varchar(36) NOT NULL, globalTxId varchar(36) NOT NULL, localTxId varchar(36) NOT NULL, - expireTime TIMESTAMP NOT NULL, + parentTxId varchar(36) DEFAULT NULL, + type varchar(50) NOT NULL, + expiryTime TIMESTAMP NOT NULL, status varchar(12), version bigint NOT NULL ); -CREATE INDEX IF NOT EXISTS saga_timeouts_index ON TxTimeout (surrogateId, expireTime, globalTxId, localTxId, status); +CREATE INDEX IF NOT EXISTS saga_timeouts_index ON TxTimeout (surrogateId, expiryTime, globalTxId, localTxId, status); diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index 225c194..497c244 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -20,7 +20,6 @@ package org.apache.servicecomb.saga.alpha.server; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE; -import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW; import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; @@ -383,10 +382,6 @@ public class AlphaIntegrationTest { asyncStub.onConnected(serviceConfig, compensateResponseObserver); blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1)); - await().atMost(1, SECONDS).until(() -> timeoutEntityRepository.count() == 1L); - Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll(); - timeouts.forEach(timeout -> assertThat(timeout.status(), is(NEW.name()))); - await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3); List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); @@ -395,7 +390,7 @@ public class AlphaIntegrationTest { assertThat(events.get(2).type(), is(SagaEndedEvent.name())); assertThat(timeoutEntityRepository.count(), is(1L)); - timeouts = timeoutEntityRepository.findAll(); + Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll(); timeouts.forEach(timeout -> { assertThat(timeout.status(), is(DONE.name())); assertThat(timeout.globalTxId(), is(globalTxId)); diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql index 0958389..929c69f 100644 --- a/alpha/alpha-server/src/test/resources/schema.sql +++ b/alpha/alpha-server/src/test/resources/schema.sql @@ -8,8 +8,8 @@ CREATE TABLE IF NOT EXISTS TxEvent ( parentTxId varchar(36) DEFAULT NULL, type varchar(50) NOT NULL, compensationMethod varchar(256) NOT NULL, - payloads varbinary(10240), - version bigint NOT NULL + expiryTime TIMESTAMP NOT NULL, + payloads varbinary(10240) ); CREATE TABLE IF NOT EXISTS Command ( @@ -29,9 +29,14 @@ CREATE TABLE IF NOT EXISTS Command ( CREATE TABLE IF NOT EXISTS TxTimeout ( surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY, + eventId bigint NOT NULL UNIQUE, + serviceName varchar(36) NOT NULL, + instanceId varchar(36) NOT NULL, globalTxId varchar(36) NOT NULL, localTxId varchar(36) NOT NULL, - expireTime TIMESTAMP NOT NULL, + parentTxId varchar(36) DEFAULT NULL, + type varchar(50) NOT NULL, + expiryTime TIMESTAMP NOT NULL, status varchar(12), version bigint NOT NULL ); diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java deleted file mode 100644 index 3015a01..0000000 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.servicecomb.saga.omega.transaction; - -import java.util.concurrent.atomic.AtomicReference; - -class OnceAwareInterceptor implements EventAwareInterceptor { - private final EventAwareInterceptor interceptor; - private final AtomicReference<EventAwareInterceptor> interceptorRef; - - OnceAwareInterceptor(EventAwareInterceptor interceptor) { - this.interceptor = interceptor; - this.interceptorRef = new AtomicReference<>(interceptor); - } - - @Override - public AlphaResponse preIntercept(String parentTxId, String signature, int timeout, Object... args) { - return interceptor.preIntercept(parentTxId, signature, timeout, args); - } - - @Override - public void postIntercept(String parentTxId, String signature) { - if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) { - interceptor.postIntercept(parentTxId, signature); - } - } - - @Override - public void onError(String parentTxId, String signature, Throwable throwable) { - if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) { - interceptor.onError(parentTxId, signature, throwable); - } - } -} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java index 090fe2e..932b990 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java @@ -54,7 +54,6 @@ public class TransactionAspect { String localTxId = context.localTxId(); context.newLocalTxId(); - OnceAwareInterceptor interceptor = new OnceAwareInterceptor(this.interceptor); AlphaResponse response = interceptor.preIntercept(localTxId, signature, compensable.timeout(), joinPoint.getArgs()); if (response.aborted()) { String abortedLocalTxId = context.localTxId(); diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java deleted file mode 100644 index 90a133b..0000000 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.servicecomb.saga.omega.transaction; - -import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; - -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Test; - -public class OnceAwareInterceptorTest { - private static final int runningCounts = 1000; - - private final String localTxId = uniquify("localTxId"); - private final String signature = uniquify("signature"); - - private final AtomicInteger postInterceptInvoked = new AtomicInteger(); - private final AtomicInteger onErrorInvoked = new AtomicInteger(); - - private final EventAwareInterceptor underlying = new EventAwareInterceptor() { - @Override - public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) { - return new AlphaResponse(false); - } - - @Override - public void postIntercept(String parentTxId, String compensationMethod) { - postInterceptInvoked.incrementAndGet(); - } - - @Override - public void onError(String parentTxId, String compensationMethod, Throwable throwable) { - onErrorInvoked.incrementAndGet(); - } - }; - - private final ExecutorService executorService = Executors.newFixedThreadPool(2); - - @Test - public void invokePostIntercept() throws Exception { - List<Future<?>> futures = new LinkedList<>(); - - for (int i = 0; i < runningCounts; i++) { - OnceAwareInterceptor interceptor = new OnceAwareInterceptor(underlying); - - futures.add(executorService.submit(() -> interceptor.postIntercept(localTxId, signature))); - } - - waitTillAllDone(futures); - - assertThat(postInterceptInvoked.get(), is(runningCounts)); - } - - @Test - public void invokeOnErrorConcurrently() throws Exception { - RuntimeException oops = new RuntimeException("oops"); - List<Future<?>> futures = new LinkedList<>(); - - for (int i = 0; i < runningCounts; i++) { - OnceAwareInterceptor interceptor = new OnceAwareInterceptor(underlying); - - futures.add(executorService.submit(() -> interceptor.onError(localTxId, signature, oops))); - } - - waitTillAllDone(futures); - - assertThat(onErrorInvoked.get(), is(runningCounts)); - } - - private void waitTillAllDone(List<Future<?>> futures) - throws InterruptedException, java.util.concurrent.ExecutionException { - for (Future<?> future : futures) { - future.get(); - } - } -} -- To stop receiving notification emails like this one, please contact ningji...@apache.org.