This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 82ab5382df266a1566f38dcd0251fd29fe06d9f0 Author: Eric Lee <dagang...@huawei.com> AuthorDate: Mon Jan 29 11:33:34 2018 +0800 SCB-239 avoid duplicate abort events in concurrent processing Signed-off-by: Eric Lee <dagang...@huawei.com> --- .../servicecomb/saga/alpha/core/Command.java | 2 +- .../servicecomb/saga/alpha/core/EventScanner.java | 24 +++---- .../core/{CommandStatus.java => TaskStatus.java} | 2 +- .../saga/alpha/core/TxConsistentService.java | 35 +++++++++- .../servicecomb/saga/alpha/core/TxEvent.java | 39 ++++++----- .../saga/alpha/core/TxEventRepository.java | 4 +- .../servicecomb/saga/alpha/core/TxTimeout.java | 81 ++++++++++++++++++++++ .../saga/alpha/core/TxTimeoutRepository.java | 14 ++-- .../alpha/core/CompositeOmegaCallbackTest.java | 3 - .../saga/alpha/core/TxConsistentServiceTest.java | 74 +++++++++++++++++--- .../servicecomb/saga/alpha/core/TxEventMaker.java | 3 - .../servicecomb/saga/alpha/server/AlphaConfig.java | 15 ++-- .../saga/alpha/server/GrpcTxEventEndpointImpl.java | 6 +- .../saga/alpha/server/SpringCommandRepository.java | 8 +-- .../saga/alpha/server/SpringTxEventRepository.java | 12 ++-- .../alpha/server/SpringTxTimeoutRepository.java | 63 +++++++++++++++++ .../alpha/server/TxEventEnvelopeRepository.java | 21 ++---- .../alpha/server/TxTimeoutEntityRepository.java | 68 ++++++++++++++++++ .../src/main/resources/schema-postgresql.sql | 16 ++++- .../saga/alpha/server/AlphaIntegrationTest.java | 40 ++++++++--- alpha/alpha-server/src/test/resources/schema.sql | 12 +++- .../saga/omega/transaction/SagaStartAspect.java | 5 +- .../transaction/OnceAwareInterceptorTest.java | 7 +- 23 files changed, 439 insertions(+), 115 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java index 49c1756..1e6f21b 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java @@ -17,7 +17,7 @@ package org.apache.servicecomb.saga.alpha.core; -import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW; import java.util.Date; diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java index 0980f3a..c10c09a 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java @@ -25,7 +25,6 @@ import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import java.lang.invoke.MethodHandles; -import java.util.Arrays; import java.util.concurrent.ScheduledExecutorService; import org.slf4j.Logger; @@ -38,22 +37,23 @@ public class EventScanner implements Runnable { private final ScheduledExecutorService scheduler; private final TxEventRepository eventRepository; private final CommandRepository commandRepository; + private final TxTimeoutRepository timeoutRepository; private final OmegaCallback omegaCallback; private final int eventPollingInterval; private long nextEndedEventId; private long nextCompensatedEventId; - private long nextTimeoutEventId; public EventScanner(ScheduledExecutorService scheduler, TxEventRepository eventRepository, CommandRepository commandRepository, + TxTimeoutRepository timeoutRepository, OmegaCallback omegaCallback, int eventPollingInterval) { - this.scheduler = scheduler; this.eventRepository = eventRepository; this.commandRepository = commandRepository; + this.timeoutRepository = timeoutRepository; this.omegaCallback = omegaCallback; this.eventPollingInterval = eventPollingInterval; } @@ -70,7 +70,7 @@ public class EventScanner implements Runnable { saveUncompensatedEventsToCommands(); compensate(); updateCompensatedCommands(); - deleteDuplicateEvents(); + deleteDuplicateSagaEndedEvents(); updateTransactionStatus(); }, 0, @@ -96,9 +96,9 @@ public class EventScanner implements Runnable { }); } - private void deleteDuplicateEvents() { + private void deleteDuplicateSagaEndedEvents() { try { - eventRepository.deleteDuplicateEvents(Arrays.asList(TxAbortedEvent.name(), SagaEndedEvent.name())); + eventRepository.deleteDuplicateEvents(SagaEndedEvent.name()); } catch (Exception e) { log.warn("Failed to delete duplicate event", e); } @@ -114,11 +114,11 @@ public class EventScanner implements Runnable { } private void abortTimeoutEvent() { - eventRepository.findFirstTimeoutEventByIdGreaterThan(nextTimeoutEventId) - .ifPresent((TxEvent event) -> { + timeoutRepository.findFirstTimeoutTxToAbort().forEach(event -> { log.info("Found timeout event {}", event); - nextTimeoutEventId = event.id(); + eventRepository.save(toTxAbortedEvent(event)); + timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), event.localTxId()); }); } @@ -134,6 +134,7 @@ public class EventScanner implements Runnable { private void markGlobalTxEnd(TxEvent event) { eventRepository.save(toSagaEndedEvent(event)); + timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), event.localTxId()); log.info("Marked end of transaction with globalTxId {}", event.globalTxId()); } @@ -146,8 +147,7 @@ public class EventScanner implements Runnable { event.parentTxId(), TxAbortedEvent.name(), "", - null, - EMPTY_PAYLOAD); + ("Transaction timeout").getBytes()); } private TxEvent toSagaEndedEvent(TxEvent event) { @@ -159,7 +159,6 @@ public class EventScanner implements Runnable { null, SagaEndedEvent.name(), "", - null, EMPTY_PAYLOAD); } @@ -183,7 +182,6 @@ public class EventScanner implements Runnable { command.parentTxId(), TxStartedEvent.name(), command.compensationMethod(), - null, command.payloads() ); } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TaskStatus.java similarity index 96% rename from alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java rename to alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TaskStatus.java index 0c9b78b..442213b 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TaskStatus.java @@ -17,7 +17,7 @@ package org.apache.servicecomb.saga.alpha.core; -public enum CommandStatus { +public enum TaskStatus { NEW, PENDING, DONE diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java index c55090a..26309b6 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java @@ -17,10 +17,18 @@ package org.apache.servicecomb.saga.alpha.core; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW; +import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; +import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; +import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import java.lang.invoke.MethodHandles; +import java.util.Arrays; +import java.util.Date; +import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,9 +37,12 @@ public class TxConsistentService { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final TxEventRepository eventRepository; + private final TxTimeoutRepository timeoutRepository; - public TxConsistentService(TxEventRepository eventRepository) { + public TxConsistentService(TxEventRepository eventRepository, + TxTimeoutRepository timeoutRepository) { this.eventRepository = eventRepository; + this.timeoutRepository = timeoutRepository; } public boolean handle(TxEvent event) { @@ -40,11 +51,33 @@ public class TxConsistentService { return false; } + if (isEventWithTimeout(event)) { + CompletableFuture.runAsync(() -> saveTxTimeout(event)); + } + eventRepository.save(event); + if (Arrays.asList(TxEndedEvent.name(), SagaEndedEvent.name(), TxAbortedEvent.name()).contains(event.type())) { + CompletableFuture.runAsync(() -> timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), event.localTxId())); + } + return true; } + private boolean isEventWithTimeout(TxEvent event) { + return Arrays.asList(TxStartedEvent.name(), SagaStartedEvent.name()).contains(event.type()) && event.timeout() != 0; + } + + private void saveTxTimeout(TxEvent event) { + Date expireTime = new Date(event.creationTime().getTime() + SECONDS.toMillis(event.timeout())); + timeoutRepository.save( + new TxTimeout( + event.globalTxId(), + event.localTxId(), + expireTime, + NEW.name())); + } + private boolean isGlobalTxAborted(TxEvent event) { return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty(); } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java index 04d385b..5966541 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java @@ -23,6 +23,8 @@ import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; +import javax.persistence.Transient; +import javax.persistence.Version; @Entity public class TxEvent { @@ -33,7 +35,6 @@ public class TxEvent { private String serviceName; private String instanceId; private Date creationTime; - private Date expireTime; private String globalTxId; private String localTxId; private String parentTxId; @@ -41,6 +42,12 @@ public class TxEvent { private String compensationMethod; private byte[] payloads; + @Version + private long version; + + @Transient + private int timeout; + private TxEvent() { } @@ -65,27 +72,25 @@ public class TxEvent { String parentTxId, String type, String compensationMethod, - Date expireTime, byte[] payloads) { - this(-1L, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, expireTime, payloads); + this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, 0, payloads); } public TxEvent( String serviceName, String instanceId, - Date creationTime, String globalTxId, String localTxId, String parentTxId, String type, String compensationMethod, - Date expireTime, + int timeout, byte[] payloads) { - this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod, - expireTime, payloads); + this(-1L, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, timeout, + payloads); } - TxEvent(Long surrogateId, + public TxEvent( String serviceName, String instanceId, Date creationTime, @@ -96,8 +101,8 @@ public class TxEvent { String compensationMethod, int timeout, byte[] payloads) { - this(surrogateId, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, - compensationMethod, timeout == 0 ? null : new Date(creationTime.getTime() + timeout*1000), payloads); + this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod, + timeout, payloads); } TxEvent(Long surrogateId, @@ -109,7 +114,7 @@ public class TxEvent { String parentTxId, String type, String compensationMethod, - Date expireTime, + int timeout, byte[] payloads) { this.surrogateId = surrogateId; @@ -121,8 +126,8 @@ public class TxEvent { this.parentTxId = parentTxId; this.type = type; this.compensationMethod = compensationMethod; - this.expireTime = expireTime; this.payloads = payloads; + this.timeout = timeout; } public String serviceName() { @@ -157,10 +162,6 @@ public class TxEvent { return compensationMethod; } - public Date expireTime() { - return expireTime; - } - public byte[] payloads() { return payloads; } @@ -169,6 +170,10 @@ public class TxEvent { return surrogateId; } + public int timeout() { + return timeout; + } + @Override public String toString() { return "TxEvent{" + @@ -181,7 +186,7 @@ public class TxEvent { ", parentTxId='" + parentTxId + '\'' + ", type='" + type + '\'' + ", compensationMethod='" + compensationMethod + '\'' + - ", expireTime='" + expireTime + '\'' + + ", timeout='" + timeout + '\'' + '}'; } } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java index ec564f9..b974bd9 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java @@ -31,7 +31,5 @@ public interface TxEventRepository { Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type); - Optional<TxEvent> findFirstTimeoutEventByIdGreaterThan(long id); - - void deleteDuplicateEvents(List<String> types); + void deleteDuplicateEvents(String type); } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java new file mode 100644 index 0000000..dc365e3 --- /dev/null +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.core; + +import java.util.Date; + +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Version; + +@Entity +public class TxTimeout { + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long surrogateId; + + private String globalTxId; + private String localTxId; + private Date expireTime; + private String status; + + @Version + private long version; + + TxTimeout() { + } + + public TxTimeout(String globalTxId, String localTxId, Date expireTime, String status) { + this.globalTxId = globalTxId; + this.localTxId = localTxId; + this.expireTime = expireTime; + this.status = status; + } + + public String globalTxId() { + return globalTxId; + } + + public String localTxId() { + return localTxId; + } + + public Date expireTime() { + return expireTime; + } + + public String status() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + @Override + public String toString() { + return "TxTimeout{" + + "globalTxId='" + globalTxId + '\'' + + ", localTxId='" + localTxId + '\'' + + ", expireTime=" + expireTime + + ", status=" + status + + '}'; + } +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java similarity index 76% rename from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java rename to alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java index eb820d6..88758c7 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java @@ -15,10 +15,14 @@ * limitations under the License. */ -package org.apache.servicecomb.saga.omega.transaction; +package org.apache.servicecomb.saga.alpha.core; -public class OmegaTxTimeoutException extends RuntimeException { - public OmegaTxTimeoutException(String cause) { - super(cause); - } +import java.util.List; + +public interface TxTimeoutRepository { + void save(TxTimeout event); + + void markTxTimeoutAsDone(String globalTxId, String localTxId); + + List<TxEvent> findFirstTimeoutTxToAbort(); } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java index b201ebe..4ded48a 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java @@ -28,7 +28,6 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import java.util.Date; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -160,13 +159,11 @@ public class CompositeOmegaCallbackTest { return new TxEvent( serviceName, instanceId, - new Date(), uniquify("globalTxId"), uniquify("localTxId"), UUID.randomUUID().toString(), eventType.name(), getClass().getCanonicalName(), - null, uniquify("blah").getBytes()); } } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java index a73c754..43d3164 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java @@ -19,17 +19,19 @@ package org.apache.servicecomb.saga.alpha.core; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static java.util.Collections.emptyList; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE; import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; +import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.contains; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; -import java.util.Date; import java.util.Deque; import java.util.List; import java.util.Optional; @@ -71,12 +73,31 @@ public class TxConsistentServiceTest { } @Override - public Optional<TxEvent> findFirstTimeoutEventByIdGreaterThan(long id) { - return Optional.empty(); + public void deleteDuplicateEvents(String type) { + } + }; + + private final Deque<TxTimeout> timeouts = new ConcurrentLinkedDeque<>(); + private final TxTimeoutRepository timeoutRepository = new TxTimeoutRepository() { + @Override + public void save(TxTimeout timeout) { + timeouts.add(timeout); + } + + @Override + public void markTxTimeoutAsDone(String globalTxId, String localTxId) { + for (TxTimeout timeout : timeouts) { + if (timeout.globalTxId().equals(globalTxId) && + timeout.localTxId().equals(localTxId)) { + timeout.setStatus(DONE.name()); + break; + } + } } @Override - public void deleteDuplicateEvents(List<String> types) { + public List<TxEvent> findFirstTimeoutTxToAbort() { + return null; } }; @@ -88,7 +109,7 @@ public class TxConsistentServiceTest { private final String compensationMethod = getClass().getCanonicalName(); - private final TxConsistentService consistentService = new TxConsistentService(eventRepository); + private final TxConsistentService consistentService = new TxConsistentService(eventRepository, timeoutRepository); private final byte[] payloads = "yeah".getBytes(); @Test @@ -105,6 +126,7 @@ public class TxConsistentServiceTest { } assertThat(this.events, contains(events)); + assertThat(timeouts.isEmpty(), is(true)); } @Test @@ -118,20 +140,56 @@ public class TxConsistentServiceTest { consistentService.handle(event); assertThat(events.size(), is(2)); + assertThat(timeouts.isEmpty(), is(true)); + } + + @Test + public void persistTimeoutEventOnArrival() { + TxEvent[] events = { + newEventWithTimeout(SagaStartedEvent, globalTxId,3), + newEventWithTimeout(TxStartedEvent, 2), + newEvent(TxEndedEvent), + newEvent(TxCompensatedEvent), + eventOf(SagaEndedEvent, globalTxId)}; + + for (TxEvent event : events) { + consistentService.handle(event); + } + + assertThat(this.events, contains(events)); + assertThat(timeouts.size(), is(2)); + await().atMost(1, SECONDS).until(this::allTimeoutIsDone); + } + + private boolean allTimeoutIsDone() { + for (TxTimeout timeout : timeouts) { + if (!timeout.status().equals(DONE.name())) { + return false; + } + } + return true; } private TxEvent newEvent(EventType eventType) { - return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, null, payloads); + return newEventWithTimeout(eventType, 0); + } + + private TxEvent newEventWithTimeout(EventType eventType, int timeout) { + return newEventWithTimeout(eventType, localTxId, timeout); + } + + private TxEvent newEventWithTimeout(EventType eventType, String localTxId, int timeout) { + return new TxEvent(serviceName, instanceId, globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, timeout, payloads); } private TxEvent eventOf(EventType eventType, String localTxId) { - return new TxEvent(serviceName, instanceId, new Date(), + return new TxEvent(serviceName, + instanceId, globalTxId, localTxId, UUID.randomUUID().toString(), eventType.name(), compensationMethod, - null, payloads); } } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java index 4b65528..c14ffd9 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java @@ -20,7 +20,6 @@ package org.apache.servicecomb.saga.alpha.core; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; -import java.util.Date; import java.util.UUID; class TxEventMaker { @@ -28,13 +27,11 @@ class TxEventMaker { return new TxEvent( uniquify("serviceName"), uniquify("instanceId"), - new Date(), uniquify("globalTxId"), uniquify("localTxId"), UUID.randomUUID().toString(), TxStartedEvent.name(), TxEventMaker.class.getCanonicalName(), - null, uniquify("blah").getBytes()); } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java index a431437..9472d0d 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java @@ -35,6 +35,7 @@ import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner; import org.apache.servicecomb.saga.alpha.core.PushBackOmegaCallback; import org.apache.servicecomb.saga.alpha.core.TxConsistentService; import org.apache.servicecomb.saga.alpha.core.TxEventRepository; +import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.domain.EntityScan; import org.springframework.context.annotation.Bean; @@ -70,6 +71,11 @@ class AlphaConfig { } @Bean + TxTimeoutRepository springTxTimeoutRepository(TxTimeoutEntityRepository timeoutRepo) { + return new SpringTxTimeoutRepository(timeoutRepo); + } + + @Bean ScheduledExecutorService compensationScheduler() { return scheduler; } @@ -80,16 +86,15 @@ class AlphaConfig { ScheduledExecutorService scheduler, TxEventRepository eventRepository, CommandRepository commandRepository, + TxTimeoutRepository timeoutRepository, OmegaCallback omegaCallback, Map<String, Map<String, OmegaCallback>> omegaCallbacks) { new EventScanner(scheduler, - eventRepository, - commandRepository, - omegaCallback, - eventPollingInterval).run(); + eventRepository, commandRepository, timeoutRepository, + omegaCallback, eventPollingInterval).run(); - TxConsistentService consistentService = new TxConsistentService(eventRepository); + TxConsistentService consistentService = new TxConsistentService(eventRepository, timeoutRepository); ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks); new Thread(startable::start).start(); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java index 679b6ba..ee7e2e4 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java @@ -75,18 +75,16 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase { @Override public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) { - Date date = new Date(message.getTimestamp()); - int timeout = message.getTimeout(); boolean ok = txConsistentService.handle(new TxEvent( message.getServiceName(), message.getInstanceId(), - date, + new Date(message.getTimestamp()), message.getGlobalTxId(), message.getLocalTxId(), message.getParentTxId().isEmpty() ? null : message.getParentTxId(), message.getType(), message.getCompensationMethod(), - timeout == 0 ? null : new Date(date.getTime() + timeout), + message.getTimeout(), message.getPayloads().toByteArray() )); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java index afbdaf5..086f88e 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java @@ -17,9 +17,9 @@ package org.apache.servicecomb.saga.alpha.server; -import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE; -import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW; -import static org.apache.servicecomb.saga.alpha.core.CommandStatus.PENDING; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.PENDING; import java.lang.invoke.MethodHandles; import java.util.LinkedHashMap; @@ -33,11 +33,9 @@ import org.apache.servicecomb.saga.alpha.core.CommandRepository; import org.apache.servicecomb.saga.alpha.core.TxEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.data.domain.PageRequest; public class SpringCommandRepository implements CommandRepository { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final PageRequest SINGLE_COMMAND_REQUEST = new PageRequest(0, 1); private final TxEventEnvelopeRepository eventRepository; private final CommandEntityRepository commandRepository; diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java index c3e8f04..5531d8f 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java @@ -25,6 +25,7 @@ import org.apache.servicecomb.saga.alpha.core.TxEventRepository; import org.springframework.data.domain.PageRequest; class SpringTxEventRepository implements TxEventRepository { + private static final PageRequest SINGLE_TX_EVENT_REQUEST = new PageRequest(0, 1); private final TxEventEnvelopeRepository eventRepo; SpringTxEventRepository(TxEventEnvelopeRepository eventRepo) { @@ -48,7 +49,7 @@ class SpringTxEventRepository implements TxEventRepository { @Override public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) { - return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, new PageRequest(0, 1)); + return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, SINGLE_TX_EVENT_REQUEST); } @Override @@ -57,12 +58,7 @@ class SpringTxEventRepository implements TxEventRepository { } @Override - public Optional<TxEvent> findFirstTimeoutEventByIdGreaterThan(long id) { - return eventRepo.findFirstTimeoutSurrogateIdGreaterThan(id); - } - - @Override - public void deleteDuplicateEvents(List<String> types) { - eventRepo.deleteByTypes(types); + public void deleteDuplicateEvents(String type) { + eventRepo.deleteByType(type); } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java new file mode 100644 index 0000000..71c808d --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.server; + +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.PENDING; + +import java.util.ArrayList; +import java.util.List; + +import javax.transaction.Transactional; + +import org.apache.servicecomb.saga.alpha.core.TxEvent; +import org.apache.servicecomb.saga.alpha.core.TxTimeout; +import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository; +import org.springframework.data.domain.PageRequest; + +public class SpringTxTimeoutRepository implements TxTimeoutRepository { + private final TxTimeoutEntityRepository timeoutRepo; + + SpringTxTimeoutRepository(TxTimeoutEntityRepository timeoutRepo) { + this.timeoutRepo = timeoutRepo; + } + + @Override + public void save(TxTimeout event) { + timeoutRepo.save(event); + } + + @Override + public void markTxTimeoutAsDone(String globalTxId, String localTxId) { + timeoutRepo.updateStatusByGlobalTxIdAndLocalTxId(DONE.name(), globalTxId, localTxId); + } + + @Transactional + @Override + public List<TxEvent> findFirstTimeoutTxToAbort() { + List<TxEvent> timeoutEvents = timeoutRepo.findFirstTimeoutTxOrderByExpireTimeAsc(new PageRequest(0, 1)); + List<TxEvent> pendingTimeoutEvents = new ArrayList<>(); + timeoutEvents.forEach(event -> { + if (timeoutRepo.updateStatusFromNewByGlobalTxIdAndLocalTxId(PENDING.name(), event.globalTxId(), event.localTxId()) + != 0) { + pendingTimeoutEvents.add(event); + } + }); + return pendingTimeoutEvents; + } +} \ No newline at end of file diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index e46b264..c4984f9 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -40,7 +40,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent(" + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, " - + "t.type, t.compensationMethod, t.expireTime, t.payloads" + + "t.type, t.compensationMethod, t.payloads" + ") FROM TxEvent t " + "WHERE t.globalTxId = ?1 AND t.type = ?2") List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type); @@ -78,26 +78,13 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId); - @Query("SELECT t FROM TxEvent t " - + "WHERE t.type IN ('TxStartedEvent', 'SagaStartedEvent') " - + " AND t.expireTime IS NOT NULL " - + " AND t.expireTime < CURRENT_TIMESTAMP " - + " AND t.surrogateId > ?1 AND NOT EXISTS (" - + " SELECT t1.globalTxId" - + " FROM TxEvent t1 " - + " WHERE t1.globalTxId = t.globalTxId " - + " AND t1.localTxId = t.localTxId " - + " AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent')) " - + "ORDER BY t.surrogateId ASC") - Optional<TxEvent> findFirstTimeoutSurrogateIdGreaterThan(long surrogateId); - @Transactional @Modifying(clearAutomatically = true) @Query("DELETE FROM TxEvent t " - + "WHERE t.type IN ?1 AND t.surrogateId NOT IN (" + + "WHERE t.type = ?1 AND t.surrogateId NOT IN (" + " SELECT MAX(t1.surrogateId) FROM TxEvent t1 " - + " WHERE t1.type = t.type" + + " WHERE t1.type = ?1 " + " GROUP BY t1.globalTxId" + ")") - void deleteByTypes(List<String> types); + void deleteByType(String type); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java new file mode 100644 index 0000000..cc39397 --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.server; + +import java.util.List; + +import javax.persistence.LockModeType; +import javax.transaction.Transactional; + +import org.apache.servicecomb.saga.alpha.core.TxEvent; +import org.apache.servicecomb.saga.alpha.core.TxTimeout; +import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.Lock; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.CrudRepository; +import org.springframework.data.repository.query.Param; + +interface TxTimeoutEntityRepository extends CrudRepository<TxTimeout, Long> { + + @Transactional + @Modifying(clearAutomatically = true) + @Query("UPDATE org.apache.servicecomb.saga.alpha.core.TxTimeout t " + + "SET t.status = :status " + + "WHERE t.globalTxId = :globalTxId " + + " AND t.localTxId = :localTxId") + void updateStatusByGlobalTxIdAndLocalTxId( + @Param("status") String status, + @Param("globalTxId") String globalTxId, + @Param("localTxId") String localTxId); + + @Transactional + @Modifying(clearAutomatically = true) + @Query("UPDATE org.apache.servicecomb.saga.alpha.core.TxTimeout t " + + "SET t.status = :status " + + "WHERE t.globalTxId = :globalTxId " + + " AND t.localTxId = :localTxId " + + " AND t.status = 'NEW'") + int updateStatusFromNewByGlobalTxIdAndLocalTxId( + @Param("status") String status, + @Param("globalTxId") String globalTxId, + @Param("localTxId") String localTxId); + + @Lock(LockModeType.OPTIMISTIC) + @Query("SELECT te FROM TxEvent AS te " + + "INNER JOIN TxTimeout AS tt " + + "ON te.globalTxId = tt.globalTxId " + + " AND te.localTxId = tt.localTxId " + + " AND tt.status = 'NEW' " + + " AND tt.expireTime < CURRENT_TIMESTAMP " + + "ORDER BY tt.expireTime ASC") + List<TxEvent> findFirstTimeoutTxOrderByExpireTimeAsc(Pageable pageable); +} diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql index a3aee1c..484c5e3 100644 --- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql +++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql @@ -3,13 +3,13 @@ CREATE TABLE IF NOT EXISTS TxEvent ( serviceName varchar(16) NOT NULL, instanceId varchar(36) NOT NULL, creationTime timestamp(6) NOT NULL DEFAULT CURRENT_DATE, - expireTime timestamp(6) NULL, globalTxId varchar(36) NOT NULL, localTxId varchar(36) NOT NULL, parentTxId varchar(36) DEFAULT NULL, type varchar(50) NOT NULL, compensationMethod varchar(256) NOT NULL, - payloads bytea + payloads bytea, + version bigint NOT NULL ); CREATE INDEX IF NOT EXISTS saga_events_index ON TxEvent (surrogateId, globalTxId, localTxId, type); @@ -31,3 +31,15 @@ CREATE TABLE IF NOT EXISTS Command ( ); CREATE INDEX IF NOT EXISTS saga_commands_index ON Command (surrogateId, eventId, globalTxId, localTxId, status); + + +CREATE TABLE IF NOT EXISTS TxTimeout ( + surrogateId BIGSERIAL PRIMARY KEY, + globalTxId varchar(36) NOT NULL, + localTxId varchar(36) NOT NULL, + expireTime TIMESTAMP NOT NULL, + status varchar(12), + version bigint NOT NULL +); + +CREATE INDEX IF NOT EXISTS saga_timeouts_index ON TxTimeout (surrogateId, expireTime, globalTxId, localTxId, status); diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index 585e755..0073648 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -19,6 +19,8 @@ package org.apache.servicecomb.saga.alpha.server; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW; import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; @@ -31,7 +33,6 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.Queue; @@ -48,6 +49,8 @@ import org.apache.servicecomb.saga.alpha.core.OmegaCallback; import org.apache.servicecomb.saga.alpha.core.TxConsistentService; import org.apache.servicecomb.saga.alpha.core.TxEvent; import org.apache.servicecomb.saga.alpha.core.TxEventRepository; +import org.apache.servicecomb.saga.alpha.core.TxTimeout; +import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository; import org.apache.servicecomb.saga.common.EventType; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand; @@ -108,12 +111,18 @@ public class AlphaIntegrationTest { private CommandRepository commandRepository; @Autowired - private CommandEntityRepository commandEntityRepository; + private TxTimeoutRepository timeoutRepository; + + @Autowired + private TxTimeoutEntityRepository timeoutEntityRepository; @Autowired private OmegaCallback omegaCallback; @Autowired + private CommandEntityRepository commandEntityRepository; + + @Autowired private Map<String, Map<String, OmegaCallback>> omegaCallbacks; @Autowired @@ -145,6 +154,7 @@ public class AlphaIntegrationTest { try { eventRepo.deleteAll(); commandEntityRepository.deleteAll(); + timeoutEntityRepository.deleteAll(); deleted = true; } catch (Exception ignored) { } @@ -373,12 +383,22 @@ public class AlphaIntegrationTest { asyncStub.onConnected(serviceConfig, compensateResponseObserver); blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1)); - await().atMost(1, SECONDS).until(() -> eventRepo.count() == 3); + assertThat(timeoutEntityRepository.count(), is(1L)); + TxTimeout timeout = timeoutEntityRepository.findOne(1L); + assertThat(timeout.status(), is(NEW.name())); + + await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3); List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); assertThat(events.get(0).type(), is(SagaStartedEvent.name())); assertThat(events.get(1).type(), is(TxAbortedEvent.name())); assertThat(events.get(2).type(), is(SagaEndedEvent.name())); + + assertThat(timeoutEntityRepository.count(), is(1L)); + timeout = timeoutEntityRepository.findOne(1L); + assertThat(timeout.status(), is(DONE.name())); + assertThat(timeout.globalTxId(), is(globalTxId)); + assertThat(timeout.localTxId(), is(globalTxId)); } @Test @@ -387,13 +407,19 @@ public class AlphaIntegrationTest { blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId)); blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1)); - await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4); + await().atMost(2, SECONDS).until(() -> eventRepo.count() == 4); List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); assertThat(events.get(0).type(), is(SagaStartedEvent.name())); assertThat(events.get(1).type(), is(TxStartedEvent.name())); assertThat(events.get(2).type(), is(TxAbortedEvent.name())); assertThat(events.get(3).type(), is(SagaEndedEvent.name())); + + assertThat(timeoutEntityRepository.count(), is(1L)); + TxTimeout timeout = timeoutEntityRepository.findOne(1L); + assertThat(timeout.status(), is(DONE.name())); + assertThat(timeout.globalTxId(), is(globalTxId)); + assertThat(timeout.localTxId(), is(localTxId)); } private GrpcAck onCompensation(GrpcCompensateCommand command) { @@ -416,13 +442,11 @@ public class AlphaIntegrationTest { return new TxEvent( serviceName, instanceId, - new Date(), globalTxId, localTxId, parentTxId, TxAbortedEvent.name(), compensationMethod, - null, payload.getBytes()); } @@ -508,7 +532,7 @@ public class AlphaIntegrationTest { Executors.newSingleThreadScheduledExecutor(), eventRepository, commandRepository, - omegaCallback, - 1).run(); + timeoutRepository, + omegaCallback, 1).run(); } } diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql index 53bcd1e..0958389 100644 --- a/alpha/alpha-server/src/test/resources/schema.sql +++ b/alpha/alpha-server/src/test/resources/schema.sql @@ -3,14 +3,13 @@ CREATE TABLE IF NOT EXISTS TxEvent ( serviceName varchar(36) NOT NULL, instanceId varchar(36) NOT NULL, creationTime TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, - expireTime TIMESTAMP NULL, globalTxId varchar(36) NOT NULL, localTxId varchar(36) NOT NULL, parentTxId varchar(36) DEFAULT NULL, type varchar(50) NOT NULL, compensationMethod varchar(256) NOT NULL, payloads varbinary(10240), --- version bigint NOT NULL + version bigint NOT NULL ); CREATE TABLE IF NOT EXISTS Command ( @@ -27,3 +26,12 @@ CREATE TABLE IF NOT EXISTS Command ( lastModified TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, version bigint NOT NULL ); + +CREATE TABLE IF NOT EXISTS TxTimeout ( + surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY, + globalTxId varchar(36) NOT NULL, + localTxId varchar(36) NOT NULL, + expireTime TIMESTAMP NOT NULL, + status varchar(12), + version bigint NOT NULL +); diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java index 7328fef..388f237 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java @@ -47,14 +47,13 @@ public class SagaStartAspect { initializeOmegaContext(); Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); - OnceAwareInterceptor interceptor = new OnceAwareInterceptor(sagaStartAnnotationProcessor); - interceptor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout()); + sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout()); LOG.debug("Initialized context {} before execution of method {}", context, method.toString()); try { Object result = joinPoint.proceed(); - interceptor.postIntercept(context.globalTxId(), method.toString()); + sagaStartAnnotationProcessor.postIntercept(context.globalTxId(), method.toString()); LOG.debug("Transaction with context {} has finished.", context); return result; diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java index 0a87491..afdb958 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java @@ -39,7 +39,6 @@ public class OnceAwareInterceptorTest { private final AtomicInteger postInterceptInvoked = new AtomicInteger(); private final AtomicInteger onErrorInvoked = new AtomicInteger(); - private final AtomicInteger onTimeoutInvoked = new AtomicInteger(); private final EventAwareInterceptor underlying = new EventAwareInterceptor() { @Override @@ -54,11 +53,7 @@ public class OnceAwareInterceptorTest { @Override public void onError(String parentTxId, String compensationMethod, Throwable throwable) { - if (throwable instanceof OmegaTxTimeoutException) { - onTimeoutInvoked.incrementAndGet(); - } else { - onErrorInvoked.incrementAndGet(); - } + onErrorInvoked.incrementAndGet(); } }; -- To stop receiving notification emails like this one, please contact ningji...@apache.org.