This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 13d930970b0c8a6abe96f096399d050cba2019e1 Author: Eric Lee <[email protected]> AuthorDate: Tue Jan 23 15:10:35 2018 +0800 SCB-239 omega stateless Signed-off-by: Eric Lee <[email protected]> --- .../servicecomb/saga/alpha/core/EventScanner.java | 43 +++++++++++++++++++--- .../servicecomb/saga/alpha/core/TxEvent.java | 23 +++++++++--- .../saga/alpha/core/TxEventRepository.java | 6 ++- .../alpha/core/CompositeOmegaCallbackTest.java | 1 + .../saga/alpha/core/TxConsistentServiceTest.java | 15 +++++++- .../servicecomb/saga/alpha/core/TxEventMaker.java | 6 +-- .../saga/alpha/server/GrpcTxEventEndpointImpl.java | 7 +++- .../saga/alpha/server/SpringTxEventRepository.java | 14 ++++++- .../alpha/server/TxEventEnvelopeRepository.java | 29 +++++++++++++-- .../src/main/resources/schema-postgresql.sql | 1 + .../saga/alpha/server/AlphaIntegrationTest.java | 42 +++++++++++++++++++-- alpha/alpha-server/src/test/resources/schema.sql | 1 + .../grpc/LoadBalancedClusterMessageSenderTest.java | 6 ++- .../connector/grpc/RetryableMessageSenderTest.java | 4 +- .../spring/TransactionInterceptionTest.java | 20 +++++----- .../omega/transaction/CompensableInterceptor.java | 6 +-- .../omega/transaction/EventAwareInterceptor.java | 4 +- .../saga/omega/transaction/SagaEndedEvent.java | 2 +- .../transaction/SagaStartAnnotationProcessor.java | 4 +- .../saga/omega/transaction/SagaStartAspect.java | 2 +- .../saga/omega/transaction/SagaStartedEvent.java | 5 +-- .../omega/transaction/TimeAwareInterceptor.java | 4 +- .../saga/omega/transaction/TransactionAspect.java | 2 +- .../saga/omega/transaction/TxAbortedEvent.java | 6 +-- .../saga/omega/transaction/TxCompensatedEvent.java | 2 +- .../saga/omega/transaction/TxEndedEvent.java | 2 +- .../saga/omega/transaction/TxEvent.java | 14 +++++-- .../saga/omega/transaction/TxStartedEvent.java | 5 ++- .../transaction/CompensableInterceptorTest.java | 2 +- .../SagaStartAnnotationProcessorTest.java | 4 +- .../transaction/TimeAwareInterceptorTest.java | 2 +- .../src/main/proto/GrpcTxEvent.proto | 1 + 32 files changed, 214 insertions(+), 71 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java index 2d51a74..0980f3a 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java @@ -19,12 +19,13 @@ package org.apache.servicecomb.saga.alpha.core; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; +import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import java.lang.invoke.MethodHandles; -import java.util.Date; +import java.util.Arrays; import java.util.concurrent.ScheduledExecutorService; import org.slf4j.Logger; @@ -42,6 +43,7 @@ public class EventScanner implements Runnable { private long nextEndedEventId; private long nextCompensatedEventId; + private long nextTimeoutEventId; public EventScanner(ScheduledExecutorService scheduler, TxEventRepository eventRepository, @@ -64,10 +66,12 @@ public class EventScanner implements Runnable { private void pollEvents() { scheduler.scheduleWithFixedDelay( () -> { + abortTimeoutEvent(); saveUncompensatedEventsToCommands(); compensate(); updateCompensatedCommands(); - deleteDuplicateSagaEndedEvents(); + deleteDuplicateEvents(); + updateTransactionStatus(); }, 0, eventPollingInterval, @@ -92,11 +96,11 @@ public class EventScanner implements Runnable { }); } - private void deleteDuplicateSagaEndedEvents() { + private void deleteDuplicateEvents() { try { - eventRepository.deleteDuplicateEvents(SagaEndedEvent.name()); + eventRepository.deleteDuplicateEvents(Arrays.asList(TxAbortedEvent.name(), SagaEndedEvent.name())); } catch (Exception e) { - log.warn("Failed to delete duplicate SagaEndedEvent", e); + log.warn("Failed to delete duplicate event", e); } } @@ -109,6 +113,19 @@ public class EventScanner implements Runnable { markSagaEnded(event); } + private void abortTimeoutEvent() { + eventRepository.findFirstTimeoutEventByIdGreaterThan(nextTimeoutEventId) + .ifPresent((TxEvent event) -> { + log.info("Found timeout event {}", event); + nextTimeoutEventId = event.id(); + eventRepository.save(toTxAbortedEvent(event)); + }); + } + + private void updateTransactionStatus() { + eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEnd); + } + private void markSagaEnded(TxEvent event) { if (commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) { markGlobalTxEnd(event); @@ -120,16 +137,29 @@ public class EventScanner implements Runnable { log.info("Marked end of transaction with globalTxId {}", event.globalTxId()); } + private TxEvent toTxAbortedEvent(TxEvent event) { + return new TxEvent( + event.serviceName(), + event.instanceId(), + event.globalTxId(), + event.localTxId(), + event.parentTxId(), + TxAbortedEvent.name(), + "", + null, + EMPTY_PAYLOAD); + } + private TxEvent toSagaEndedEvent(TxEvent event) { return new TxEvent( event.serviceName(), event.instanceId(), - new Date(), event.globalTxId(), event.globalTxId(), null, SagaEndedEvent.name(), "", + null, EMPTY_PAYLOAD); } @@ -153,6 +183,7 @@ public class EventScanner implements Runnable { command.parentTxId(), TxStartedEvent.name(), command.compensationMethod(), + null, command.payloads() ); } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java index 1364cb7..04d385b 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java @@ -33,6 +33,7 @@ public class TxEvent { private String serviceName; private String instanceId; private Date creationTime; + private Date expireTime; private String globalTxId; private String localTxId; private String parentTxId; @@ -64,8 +65,9 @@ public class TxEvent { String parentTxId, String type, String compensationMethod, + Date expireTime, byte[] payloads) { - this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads); + this(-1L, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, expireTime, payloads); } public TxEvent( @@ -77,21 +79,25 @@ public class TxEvent { String parentTxId, String type, String compensationMethod, + Date expireTime, byte[] payloads) { - this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod, payloads); + this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod, + expireTime, payloads); } - public TxEvent( - long id, + TxEvent(Long surrogateId, String serviceName, String instanceId, + Date creationTime, String globalTxId, String localTxId, String parentTxId, String type, String compensationMethod, + int timeout, byte[] payloads) { - this(id, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads); + this(surrogateId, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, + compensationMethod, timeout == 0 ? null : new Date(creationTime.getTime() + timeout*1000), payloads); } TxEvent(Long surrogateId, @@ -103,6 +109,7 @@ public class TxEvent { String parentTxId, String type, String compensationMethod, + Date expireTime, byte[] payloads) { this.surrogateId = surrogateId; @@ -114,6 +121,7 @@ public class TxEvent { this.parentTxId = parentTxId; this.type = type; this.compensationMethod = compensationMethod; + this.expireTime = expireTime; this.payloads = payloads; } @@ -149,6 +157,10 @@ public class TxEvent { return compensationMethod; } + public Date expireTime() { + return expireTime; + } + public byte[] payloads() { return payloads; } @@ -169,6 +181,7 @@ public class TxEvent { ", parentTxId='" + parentTxId + '\'' + ", type='" + type + '\'' + ", compensationMethod='" + compensationMethod + '\'' + + ", expireTime='" + expireTime + '\'' + '}'; } } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java index b61aa06..ec564f9 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java @@ -23,11 +23,15 @@ import java.util.Optional; public interface TxEventRepository { void save(TxEvent event); + Optional<TxEvent> findFirstAbortedGlobalTransaction(); + List<TxEvent> findTransactions(String globalTxId, String type); List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type); Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type); - void deleteDuplicateEvents(String type); + Optional<TxEvent> findFirstTimeoutEventByIdGreaterThan(long id); + + void deleteDuplicateEvents(List<String> types); } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java index 5cda4c5..b201ebe 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java @@ -166,6 +166,7 @@ public class CompositeOmegaCallbackTest { UUID.randomUUID().toString(), eventType.name(), getClass().getCanonicalName(), + null, uniquify("blah").getBytes()); } } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java index 231d5bf..a73c754 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java @@ -49,6 +49,11 @@ public class TxConsistentServiceTest { } @Override + public Optional<TxEvent> findFirstAbortedGlobalTransaction() { + return Optional.empty(); + } + + @Override public List<TxEvent> findTransactions(String globalTxId, String type) { return events.stream() .filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type())) @@ -66,7 +71,12 @@ public class TxConsistentServiceTest { } @Override - public void deleteDuplicateEvents(String type) { + public Optional<TxEvent> findFirstTimeoutEventByIdGreaterThan(long id) { + return Optional.empty(); + } + + @Override + public void deleteDuplicateEvents(List<String> types) { } }; @@ -111,7 +121,7 @@ public class TxConsistentServiceTest { } private TxEvent newEvent(EventType eventType) { - return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, payloads); + return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, null, payloads); } private TxEvent eventOf(EventType eventType, String localTxId) { @@ -121,6 +131,7 @@ public class TxConsistentServiceTest { UUID.randomUUID().toString(), eventType.name(), compensationMethod, + null, payloads); } } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java index 68c33a9..4b65528 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java @@ -17,9 +17,8 @@ package org.apache.servicecomb.saga.alpha.core; -import org.apache.servicecomb.saga.common.EventType; - import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; +import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import java.util.Date; import java.util.UUID; @@ -33,8 +32,9 @@ class TxEventMaker { uniquify("globalTxId"), uniquify("localTxId"), UUID.randomUUID().toString(), - EventType.TxStartedEvent.name(), + TxStartedEvent.name(), TxEventMaker.class.getCanonicalName(), + null, uniquify("blah").getBytes()); } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java index eced7f9..679b6ba 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java @@ -56,7 +56,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase { public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) { omegaCallbacks .computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>()) - .computeIfAbsent(request.getInstanceId(), key -> new GrpcOmegaCallback(responseObserver)); + .put(request.getInstanceId(), new GrpcOmegaCallback(responseObserver)); } // TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback may not be registered on disconnected @@ -75,15 +75,18 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase { @Override public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) { + Date date = new Date(message.getTimestamp()); + int timeout = message.getTimeout(); boolean ok = txConsistentService.handle(new TxEvent( message.getServiceName(), message.getInstanceId(), - new Date(message.getTimestamp()), + date, message.getGlobalTxId(), message.getLocalTxId(), message.getParentTxId().isEmpty() ? null : message.getParentTxId(), message.getType(), message.getCompensationMethod(), + timeout == 0 ? null : new Date(date.getTime() + timeout), message.getPayloads().toByteArray() )); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java index ad32148..c3e8f04 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java @@ -37,6 +37,11 @@ class SpringTxEventRepository implements TxEventRepository { } @Override + public Optional<TxEvent> findFirstAbortedGlobalTransaction() { + return eventRepo.findFirstAbortedGlobalTxByType(); + } + + @Override public List<TxEvent> findTransactions(String globalTxId, String type) { return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type); } @@ -52,7 +57,12 @@ class SpringTxEventRepository implements TxEventRepository { } @Override - public void deleteDuplicateEvents(String type) { - eventRepo.deleteByType(type); + public Optional<TxEvent> findFirstTimeoutEventByIdGreaterThan(long id) { + return eventRepo.findFirstTimeoutSurrogateIdGreaterThan(id); + } + + @Override + public void deleteDuplicateEvents(List<String> types) { + eventRepo.deleteByTypes(types); } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index 2e52fef..e46b264 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -31,8 +31,16 @@ import org.springframework.data.repository.CrudRepository; interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { List<TxEvent> findByGlobalTxId(String globalTxId); + @Query("SELECT t FROM TxEvent t " + + "WHERE t.type = 'TxAbortedEvent' AND NOT EXISTS( " + + " SELECT t1.globalTxId FROM TxEvent t1" + + " WHERE t1.globalTxId = t.globalTxId " + + " AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent'))") + Optional<TxEvent> findFirstAbortedGlobalTxByType(); + @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent(" - + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads" + + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, " + + "t.type, t.compensationMethod, t.expireTime, t.payloads" + ") FROM TxEvent t " + "WHERE t.globalTxId = ?1 AND t.type = ?2") List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type); @@ -70,13 +78,26 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId); + @Query("SELECT t FROM TxEvent t " + + "WHERE t.type IN ('TxStartedEvent', 'SagaStartedEvent') " + + " AND t.expireTime IS NOT NULL " + + " AND t.expireTime < CURRENT_TIMESTAMP " + + " AND t.surrogateId > ?1 AND NOT EXISTS (" + + " SELECT t1.globalTxId" + + " FROM TxEvent t1 " + + " WHERE t1.globalTxId = t.globalTxId " + + " AND t1.localTxId = t.localTxId " + + " AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent')) " + + "ORDER BY t.surrogateId ASC") + Optional<TxEvent> findFirstTimeoutSurrogateIdGreaterThan(long surrogateId); + @Transactional @Modifying(clearAutomatically = true) @Query("DELETE FROM TxEvent t " - + "WHERE t.type = ?1 AND t.surrogateId NOT IN (" + + "WHERE t.type IN ?1 AND t.surrogateId NOT IN (" + " SELECT MAX(t1.surrogateId) FROM TxEvent t1 " - + " WHERE t1.type = ?1" + + " WHERE t1.type = t.type" + " GROUP BY t1.globalTxId" + ")") - void deleteByType(String type); + void deleteByTypes(List<String> types); } diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql index d6b5172..a3aee1c 100644 --- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql +++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql @@ -3,6 +3,7 @@ CREATE TABLE IF NOT EXISTS TxEvent ( serviceName varchar(16) NOT NULL, instanceId varchar(36) NOT NULL, creationTime timestamp(6) NOT NULL DEFAULT CURRENT_DATE, + expireTime timestamp(6) NULL, globalTxId varchar(36) NOT NULL, localTxId varchar(36) NOT NULL, parentTxId varchar(36) DEFAULT NULL, diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index 539f610..585e755 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -20,6 +20,7 @@ package org.apache.servicecomb.saga.alpha.server; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; +import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; @@ -367,6 +368,34 @@ public class AlphaIntegrationTest { }); } + @Test + public void abortTimeoutSagaStartedEvent() { + asyncStub.onConnected(serviceConfig, compensateResponseObserver); + blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1)); + + await().atMost(1, SECONDS).until(() -> eventRepo.count() == 3); + + List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); + assertThat(events.get(0).type(), is(SagaStartedEvent.name())); + assertThat(events.get(1).type(), is(TxAbortedEvent.name())); + assertThat(events.get(2).type(), is(SagaEndedEvent.name())); + } + + @Test + public void abortTimeoutTxStartedEvent() { + asyncStub.onConnected(serviceConfig, compensateResponseObserver); + blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId)); + blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1)); + + await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4); + + List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); + assertThat(events.get(0).type(), is(SagaStartedEvent.name())); + assertThat(events.get(1).type(), is(TxStartedEvent.name())); + assertThat(events.get(2).type(), is(TxAbortedEvent.name())); + assertThat(events.get(3).type(), is(SagaEndedEvent.name())); + } + private GrpcAck onCompensation(GrpcCompensateCommand command) { return blockingStub.onTxEvent( eventOf(TxCompensatedEvent, @@ -393,9 +422,14 @@ public class AlphaIntegrationTest { parentTxId, TxAbortedEvent.name(), compensationMethod, + null, payload.getBytes()); } + private GrpcTxEvent someGrpcEventWithTimeout(EventType type, String localTxId, String parentTxId, int timeout) { + return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), timeout); + } + private GrpcTxEvent someGrpcEvent(EventType type) { return eventOf(type, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName()); } @@ -405,11 +439,11 @@ public class AlphaIntegrationTest { } private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId) { - return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName()); + return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), 0); } private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) { - return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod); + return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod, 0); } private GrpcTxEvent eventOf(EventType eventType, @@ -417,7 +451,8 @@ public class AlphaIntegrationTest { String localTxId, String parentTxId, byte[] payloads, - String compensationMethod) { + String compensationMethod, + int timeout) { return GrpcTxEvent.newBuilder() .setServiceName(serviceName) @@ -428,6 +463,7 @@ public class AlphaIntegrationTest { .setParentTxId(parentTxId == null ? "" : parentTxId) .setType(eventType.name()) .setCompensationMethod(compensationMethod) + .setTimeout(timeout) .setPayloads(ByteString.copyFrom(payloads)) .build(); } diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql index 344fdda..53bcd1e 100644 --- a/alpha/alpha-server/src/test/resources/schema.sql +++ b/alpha/alpha-server/src/test/resources/schema.sql @@ -3,6 +3,7 @@ CREATE TABLE IF NOT EXISTS TxEvent ( serviceName varchar(36) NOT NULL, instanceId varchar(36) NOT NULL, creationTime TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, + expireTime TIMESTAMP NULL, globalTxId varchar(36) NOT NULL, localTxId varchar(36) NOT NULL, parentTxId varchar(36) DEFAULT NULL, diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java index 8062ae9..bb24c5c 100644 --- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java +++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java @@ -97,7 +97,8 @@ public class LoadBalancedClusterMessageSenderTest { private final String localTxId = uniquify("localTxId"); private final String parentTxId = uniquify("parentTxId"); private final String compensationMethod = getClass().getCanonicalName(); - private final TxEvent event = new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, "blah"); + private final TxEvent event = new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, + compensationMethod, 0, "blah"); private final String serviceName = uniquify("serviceName"); private final String[] addresses = {"localhost:8080", "localhost:8090"}; @@ -299,7 +300,7 @@ public class LoadBalancedClusterMessageSenderTest { public void forwardSendResult() { assertThat(messageSender.send(event).aborted(), is(false)); - TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", "blah"); + TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, "blah"); assertThat(messageSender.send(rejectEvent).aborted(), is(true)); } @@ -356,6 +357,7 @@ public class LoadBalancedClusterMessageSenderTest { request.getLocalTxId(), request.getParentTxId(), request.getCompensationMethod(), + 0, new String(request.getPayloads().toByteArray()))); sleep(); diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java index 562c50f..95bda85 100644 --- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java +++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java @@ -42,7 +42,7 @@ public class RetryableMessageSenderTest { private final String globalTxId = uniquify("globalTxId"); private final String localTxId = uniquify("localTxId"); - private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x"); + private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x", 0); @Test public void sendEventWhenSenderIsAvailable() { @@ -56,7 +56,7 @@ public class RetryableMessageSenderTest { @Test public void blowsUpWhenEventIsSagaStarted() { - TxEvent event = new SagaStartedEvent(globalTxId, localTxId); + TxEvent event = new SagaStartedEvent(globalTxId, localTxId, 0); try { messageSender.send(event); diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java index 7daf954..9fd2a7e 100644 --- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java +++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java @@ -131,7 +131,7 @@ public class TransactionInterceptionTest { assertArrayEquals( new String[]{ - new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(), + new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(), new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()}, toArray(messages) ); @@ -152,7 +152,7 @@ public class TransactionInterceptionTest { assertArrayEquals( new String[]{ - new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, illegalUser).toString(), + new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, illegalUser).toString(), new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, throwable).toString()}, toArray(messages) ); @@ -174,9 +174,9 @@ public class TransactionInterceptionTest { assertArrayEquals( new String[]{ - new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(), + new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(), new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(), - new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, anotherUser).toString(), + new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, anotherUser).toString(), new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString(), new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(), new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString() @@ -196,9 +196,9 @@ public class TransactionInterceptionTest { assertArrayEquals( new String[]{ - new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(), + new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(), new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(), - new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, jack).toString(), + new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, jack).toString(), new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()}, toArray(messages) ); @@ -215,9 +215,9 @@ public class TransactionInterceptionTest { assertArrayEquals( new String[]{ - new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(), + new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(), new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(), - new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, jack).toString(), + new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, jack).toString(), new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()}, toArray(messages) ); @@ -237,7 +237,7 @@ public class TransactionInterceptionTest { assertArrayEquals( new String[]{ - new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(), + new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(), new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()}, toArray(messages) ); @@ -255,7 +255,7 @@ public class TransactionInterceptionTest { assertArrayEquals( new String[] { - new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(), + new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(), new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()}, toArray(messages) ); diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java index 074a5ec..53e5158 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java @@ -29,9 +29,9 @@ class CompensableInterceptor implements EventAwareInterceptor { } @Override - public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) { - return sender - .send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, message)); + public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) { + return sender.send(new TxStartedEvent( + context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, timeout, message)); } @Override diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java index 4425949..7b71dd4 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java @@ -20,7 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction; public interface EventAwareInterceptor { EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() { @Override - public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) { + public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) { return new AlphaResponse(false); } @@ -33,7 +33,7 @@ public interface EventAwareInterceptor { } }; - AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message); + AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message); void postIntercept(String parentTxId, String compensationMethod) throws Throwable; diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java index 7074d8f..8c70e3a 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java @@ -22,6 +22,6 @@ import org.apache.servicecomb.saga.common.EventType; public class SagaEndedEvent extends TxEvent { public SagaEndedEvent(String globalTxId, String localTxId) { - super(EventType.SagaEndedEvent, globalTxId, localTxId, null, ""); + super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0); } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java index 7ef021a..d3d55fe 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java @@ -32,9 +32,9 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor { } @Override - public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) { + public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) { try { - return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId())); + return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout)); } catch (OmegaException e) { throw new TransactionalException(e.getMessage(), e.getCause()); } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java index 0951752..db8e3a0 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java @@ -52,7 +52,7 @@ public class SagaStartAspect { Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); TimeAwareInterceptor interceptor = new TimeAwareInterceptor(sagaStartAnnotationProcessor); - interceptor.preIntercept(context.globalTxId(), method.toString()); + interceptor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout()); LOG.debug("Initialized context {} before execution of method {}", context, method.toString()); scheduleTimeoutTask(interceptor, method, sagaStart.timeout()); diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java index 54f61e4..cb76a26 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java @@ -20,9 +20,8 @@ package org.apache.servicecomb.saga.omega.transaction; import org.apache.servicecomb.saga.common.EventType; public class SagaStartedEvent extends TxEvent { - - public SagaStartedEvent(String globalTxId, String localTxId) { + public SagaStartedEvent(String globalTxId, String localTxId, int timeout) { // use "" instead of null as compensationMethod requires not null in sql - super(EventType.SagaStartedEvent, globalTxId, localTxId, null, ""); + super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", timeout); } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java index 2057fbc..9de26d2 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java @@ -30,8 +30,8 @@ class TimeAwareInterceptor implements EventAwareInterceptor { } @Override - public AlphaResponse preIntercept(String parentTxId, String signature, Object... args) { - return interceptor.preIntercept(parentTxId, signature, args); + public AlphaResponse preIntercept(String parentTxId, String signature, int timeout, Object... args) { + return interceptor.preIntercept(parentTxId, signature, timeout, args); } @Override diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java index 5a61dc7..718d1fd 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java @@ -59,7 +59,7 @@ public class TransactionAspect { context.newLocalTxId(); TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor); - AlphaResponse response = interceptor.preIntercept(localTxId, signature, joinPoint.getArgs()); + AlphaResponse response = interceptor.preIntercept(localTxId, signature, compensable.timeout(), joinPoint.getArgs()); if (response.aborted()) { String abortedLocalTxId = context.localTxId(); context.setLocalTxId(localTxId); diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java index 13df2f7..d6aa533 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java @@ -17,14 +17,14 @@ package org.apache.servicecomb.saga.omega.transaction; -import org.apache.servicecomb.saga.common.EventType; - import java.io.PrintWriter; import java.io.StringWriter; +import org.apache.servicecomb.saga.common.EventType; + public class TxAbortedEvent extends TxEvent { public TxAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) { - super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, stackTrace(throwable)); + super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, stackTrace(throwable)); } private static String stackTrace(Throwable e) { diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java index dbbaeab..8e288df 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java @@ -21,6 +21,6 @@ import org.apache.servicecomb.saga.common.EventType; public class TxCompensatedEvent extends TxEvent { public TxCompensatedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) { - super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod); + super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0); } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java index 4e587c8..8d6666a 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java @@ -21,6 +21,6 @@ import org.apache.servicecomb.saga.common.EventType; public class TxEndedEvent extends TxEvent { public TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) { - super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, compensationMethod); + super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0); } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java index 1398d3e..34be420 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java @@ -17,10 +17,10 @@ package org.apache.servicecomb.saga.omega.transaction; -import org.apache.servicecomb.saga.common.EventType; - import java.util.Arrays; +import org.apache.servicecomb.saga.common.EventType; + public class TxEvent { private final long timestamp; @@ -29,9 +29,11 @@ public class TxEvent { private final String localTxId; private final String parentTxId; private final String compensationMethod; + private final int timeout; private final Object[] payloads; - public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) { + public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod, + int timeout, Object... payloads) { this.timestamp = System.currentTimeMillis(); this.type = type; this.localTxId = localTxId; @@ -39,6 +41,7 @@ public class TxEvent { this.compensationMethod = compensationMethod; this.payloads = payloads; this.globalTxId = globalTxId; + this.timeout = timeout; } public long timestamp() { @@ -69,6 +72,10 @@ public class TxEvent { return compensationMethod; } + public int timeout() { + return timeout; + } + @Override public String toString() { return type.name() + "{" + @@ -76,6 +83,7 @@ public class TxEvent { ", localTxId='" + localTxId + '\'' + ", parentTxId='" + parentTxId + '\'' + ", compensationMethod='" + compensationMethod + '\'' + + ", timeout='" + timeout + '\'' + ", payloads=" + Arrays.toString(payloads) + '}'; } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java index ce93ea3..4732d95 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java @@ -21,7 +21,8 @@ import org.apache.servicecomb.saga.common.EventType; public class TxStartedEvent extends TxEvent { - public TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) { - super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, payloads); + public TxStartedEvent(String globalTxId, String localTxId, String parentTxId, + String compensationMethod, int timeout, Object... payloads) { + super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, payloads); } } diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java index 21af7e6..0ef9d4d 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java @@ -62,7 +62,7 @@ public class CompensableInterceptorTest { @Test public void sendsTxStartedEventBefore() throws Exception { - interceptor.preIntercept(parentTxId, compensationMethod, message); + interceptor.preIntercept(parentTxId, compensationMethod, 0, message); TxEvent event = messages.get(0); diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java index 566a456..cc84fc5 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java @@ -65,7 +65,7 @@ public class SagaStartAnnotationProcessorTest { @Test public void sendsSagaStartedEvent() { - sagaStartAnnotationProcessor.preIntercept(null, null); + sagaStartAnnotationProcessor.preIntercept(null, null, 0); TxEvent event = messages.get(0); @@ -99,7 +99,7 @@ public class SagaStartAnnotationProcessorTest { doThrow(exception).when(sender).send(any()); try { - sagaStartAnnotationProcessor.preIntercept(null, null); + sagaStartAnnotationProcessor.preIntercept(null, null, 0); expectFailing(TransactionalException.class); } catch (TransactionalException e) { assertThat(e.getMessage(), is("exception")); diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java index 1136a45..0f2d2eb 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java @@ -46,7 +46,7 @@ public class TimeAwareInterceptorTest { private final EventAwareInterceptor underlying = new EventAwareInterceptor() { @Override - public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) { + public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) { return new AlphaResponse(false); } diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto index 2636881..3944eee 100644 --- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto +++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto @@ -46,6 +46,7 @@ message GrpcTxEvent { bytes payloads = 7; string serviceName = 8; string instanceId = 9; + int32 timeout = 10; } message GrpcCompensateCommand { -- To stop receiving notification emails like this one, please contact [email protected].
