This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 13d930970b0c8a6abe96f096399d050cba2019e1
Author: Eric Lee <dagang...@huawei.com>
AuthorDate: Tue Jan 23 15:10:35 2018 +0800

    SCB-239 omega stateless
    
    Signed-off-by: Eric Lee <dagang...@huawei.com>
---
 .../servicecomb/saga/alpha/core/EventScanner.java  | 43 +++++++++++++++++++---
 .../servicecomb/saga/alpha/core/TxEvent.java       | 23 +++++++++---
 .../saga/alpha/core/TxEventRepository.java         |  6 ++-
 .../alpha/core/CompositeOmegaCallbackTest.java     |  1 +
 .../saga/alpha/core/TxConsistentServiceTest.java   | 15 +++++++-
 .../servicecomb/saga/alpha/core/TxEventMaker.java  |  6 +--
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |  7 +++-
 .../saga/alpha/server/SpringTxEventRepository.java | 14 ++++++-
 .../alpha/server/TxEventEnvelopeRepository.java    | 29 +++++++++++++--
 .../src/main/resources/schema-postgresql.sql       |  1 +
 .../saga/alpha/server/AlphaIntegrationTest.java    | 42 +++++++++++++++++++--
 alpha/alpha-server/src/test/resources/schema.sql   |  1 +
 .../grpc/LoadBalancedClusterMessageSenderTest.java |  6 ++-
 .../connector/grpc/RetryableMessageSenderTest.java |  4 +-
 .../spring/TransactionInterceptionTest.java        | 20 +++++-----
 .../omega/transaction/CompensableInterceptor.java  |  6 +--
 .../omega/transaction/EventAwareInterceptor.java   |  4 +-
 .../saga/omega/transaction/SagaEndedEvent.java     |  2 +-
 .../transaction/SagaStartAnnotationProcessor.java  |  4 +-
 .../saga/omega/transaction/SagaStartAspect.java    |  2 +-
 .../saga/omega/transaction/SagaStartedEvent.java   |  5 +--
 .../omega/transaction/TimeAwareInterceptor.java    |  4 +-
 .../saga/omega/transaction/TransactionAspect.java  |  2 +-
 .../saga/omega/transaction/TxAbortedEvent.java     |  6 +--
 .../saga/omega/transaction/TxCompensatedEvent.java |  2 +-
 .../saga/omega/transaction/TxEndedEvent.java       |  2 +-
 .../saga/omega/transaction/TxEvent.java            | 14 +++++--
 .../saga/omega/transaction/TxStartedEvent.java     |  5 ++-
 .../transaction/CompensableInterceptorTest.java    |  2 +-
 .../SagaStartAnnotationProcessorTest.java          |  4 +-
 .../transaction/TimeAwareInterceptorTest.java      |  2 +-
 .../src/main/proto/GrpcTxEvent.proto               |  1 +
 32 files changed, 214 insertions(+), 71 deletions(-)

diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 2d51a74..0980f3a 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -19,12 +19,13 @@ package org.apache.servicecomb.saga.alpha.core;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
-import java.util.Date;
+import java.util.Arrays;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.slf4j.Logger;
@@ -42,6 +43,7 @@ public class EventScanner implements Runnable {
 
   private long nextEndedEventId;
   private long nextCompensatedEventId;
+  private long nextTimeoutEventId;
 
   public EventScanner(ScheduledExecutorService scheduler,
       TxEventRepository eventRepository,
@@ -64,10 +66,12 @@ public class EventScanner implements Runnable {
   private void pollEvents() {
     scheduler.scheduleWithFixedDelay(
         () -> {
+          abortTimeoutEvent();
           saveUncompensatedEventsToCommands();
           compensate();
           updateCompensatedCommands();
-          deleteDuplicateSagaEndedEvents();
+          deleteDuplicateEvents();
+          updateTransactionStatus();
         },
         0,
         eventPollingInterval,
@@ -92,11 +96,11 @@ public class EventScanner implements Runnable {
         });
   }
 
-  private void deleteDuplicateSagaEndedEvents() {
+  private void deleteDuplicateEvents() {
     try {
-      eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
+      
eventRepository.deleteDuplicateEvents(Arrays.asList(TxAbortedEvent.name(), 
SagaEndedEvent.name()));
     } catch (Exception e) {
-      log.warn("Failed to delete duplicate SagaEndedEvent", e);
+      log.warn("Failed to delete duplicate event", e);
     }
   }
 
@@ -109,6 +113,19 @@ public class EventScanner implements Runnable {
     markSagaEnded(event);
   }
 
+  private void abortTimeoutEvent() {
+    eventRepository.findFirstTimeoutEventByIdGreaterThan(nextTimeoutEventId)
+    .ifPresent((TxEvent event) -> {
+      log.info("Found timeout event {}", event);
+      nextTimeoutEventId = event.id();
+      eventRepository.save(toTxAbortedEvent(event));
+    });
+  }
+
+  private void updateTransactionStatus() {
+    
eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEnd);
+  }
+
   private void markSagaEnded(TxEvent event) {
     if 
(commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
       markGlobalTxEnd(event);
@@ -120,16 +137,29 @@ public class EventScanner implements Runnable {
     log.info("Marked end of transaction with globalTxId {}", 
event.globalTxId());
   }
 
+  private TxEvent toTxAbortedEvent(TxEvent event) {
+    return new TxEvent(
+        event.serviceName(),
+        event.instanceId(),
+        event.globalTxId(),
+        event.localTxId(),
+        event.parentTxId(),
+        TxAbortedEvent.name(),
+        "",
+        null,
+        EMPTY_PAYLOAD);
+  }
+
   private TxEvent toSagaEndedEvent(TxEvent event) {
     return new TxEvent(
         event.serviceName(),
         event.instanceId(),
-        new Date(),
         event.globalTxId(),
         event.globalTxId(),
         null,
         SagaEndedEvent.name(),
         "",
+        null,
         EMPTY_PAYLOAD);
   }
 
@@ -153,6 +183,7 @@ public class EventScanner implements Runnable {
         command.parentTxId(),
         TxStartedEvent.name(),
         command.compensationMethod(),
+        null,
         command.payloads()
     );
   }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 1364cb7..04d385b 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -33,6 +33,7 @@ public class TxEvent {
   private String serviceName;
   private String instanceId;
   private Date creationTime;
+  private Date expireTime;
   private String globalTxId;
   private String localTxId;
   private String parentTxId;
@@ -64,8 +65,9 @@ public class TxEvent {
       String parentTxId,
       String type,
       String compensationMethod,
+      Date expireTime,
       byte[] payloads) {
-    this(serviceName, instanceId, new Date(), globalTxId, localTxId, 
parentTxId, type, compensationMethod, payloads);
+    this(-1L, serviceName, instanceId, new Date(), globalTxId, localTxId, 
parentTxId, type, compensationMethod, expireTime, payloads);
   }
 
   public TxEvent(
@@ -77,21 +79,25 @@ public class TxEvent {
       String parentTxId,
       String type,
       String compensationMethod,
+      Date expireTime,
       byte[] payloads) {
-    this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, 
parentTxId, type, compensationMethod, payloads);
+    this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, 
parentTxId, type, compensationMethod,
+        expireTime, payloads);
   }
 
-  public TxEvent(
-      long id,
+  TxEvent(Long surrogateId,
       String serviceName,
       String instanceId,
+      Date creationTime,
       String globalTxId,
       String localTxId,
       String parentTxId,
       String type,
       String compensationMethod,
+      int timeout,
       byte[] payloads) {
-    this(id, serviceName, instanceId, new Date(), globalTxId, localTxId, 
parentTxId, type, compensationMethod, payloads);
+    this(surrogateId, serviceName, instanceId, creationTime, globalTxId, 
localTxId, parentTxId, type,
+        compensationMethod, timeout == 0 ? null : new 
Date(creationTime.getTime() + timeout*1000), payloads);
   }
 
   TxEvent(Long surrogateId,
@@ -103,6 +109,7 @@ public class TxEvent {
       String parentTxId,
       String type,
       String compensationMethod,
+      Date expireTime,
       byte[] payloads) {
 
     this.surrogateId = surrogateId;
@@ -114,6 +121,7 @@ public class TxEvent {
     this.parentTxId = parentTxId;
     this.type = type;
     this.compensationMethod = compensationMethod;
+    this.expireTime = expireTime;
     this.payloads = payloads;
   }
 
@@ -149,6 +157,10 @@ public class TxEvent {
     return compensationMethod;
   }
 
+  public Date expireTime() {
+    return expireTime;
+  }
+
   public byte[] payloads() {
     return payloads;
   }
@@ -169,6 +181,7 @@ public class TxEvent {
         ", parentTxId='" + parentTxId + '\'' +
         ", type='" + type + '\'' +
         ", compensationMethod='" + compensationMethod + '\'' +
+        ", expireTime='" + expireTime + '\'' +
         '}';
   }
 }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index b61aa06..ec564f9 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -23,11 +23,15 @@ import java.util.Optional;
 public interface TxEventRepository {
   void save(TxEvent event);
 
+  Optional<TxEvent> findFirstAbortedGlobalTransaction();
+
   List<TxEvent> findTransactions(String globalTxId, String type);
 
   List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String 
type);
 
   Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String 
type);
 
-  void deleteDuplicateEvents(String type);
+  Optional<TxEvent> findFirstTimeoutEventByIdGreaterThan(long id);
+
+  void deleteDuplicateEvents(List<String> types);
 }
diff --git 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
index 5cda4c5..b201ebe 100644
--- 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
+++ 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
@@ -166,6 +166,7 @@ public class CompositeOmegaCallbackTest {
         UUID.randomUUID().toString(),
         eventType.name(),
         getClass().getCanonicalName(),
+        null,
         uniquify("blah").getBytes());
   }
 }
diff --git 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 231d5bf..a73c754 100644
--- 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -49,6 +49,11 @@ public class TxConsistentServiceTest {
     }
 
     @Override
+    public Optional<TxEvent> findFirstAbortedGlobalTransaction() {
+      return Optional.empty();
+    }
+
+    @Override
     public List<TxEvent> findTransactions(String globalTxId, String type) {
       return events.stream()
           .filter(event -> globalTxId.equals(event.globalTxId()) && 
type.equals(event.type()))
@@ -66,7 +71,12 @@ public class TxConsistentServiceTest {
     }
 
     @Override
-    public void deleteDuplicateEvents(String type) {
+    public Optional<TxEvent> findFirstTimeoutEventByIdGreaterThan(long id) {
+      return Optional.empty();
+    }
+
+    @Override
+    public void deleteDuplicateEvents(List<String> types) {
     }
   };
 
@@ -111,7 +121,7 @@ public class TxConsistentServiceTest {
   }
 
   private TxEvent newEvent(EventType eventType) {
-    return new TxEvent(serviceName, instanceId, new Date(), globalTxId, 
localTxId, parentTxId, eventType.name(), compensationMethod, payloads);
+    return new TxEvent(serviceName, instanceId, new Date(), globalTxId, 
localTxId, parentTxId, eventType.name(), compensationMethod, null, payloads);
   }
 
   private TxEvent eventOf(EventType eventType, String localTxId) {
@@ -121,6 +131,7 @@ public class TxConsistentServiceTest {
         UUID.randomUUID().toString(),
         eventType.name(),
         compensationMethod,
+        null,
         payloads);
   }
 }
diff --git 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java
 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java
index 68c33a9..4b65528 100644
--- 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java
+++ 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java
@@ -17,9 +17,8 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import org.apache.servicecomb.saga.common.EventType;
-
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.util.Date;
 import java.util.UUID;
@@ -33,8 +32,9 @@ class TxEventMaker {
         uniquify("globalTxId"),
         uniquify("localTxId"),
         UUID.randomUUID().toString(),
-        EventType.TxStartedEvent.name(),
+        TxStartedEvent.name(),
         TxEventMaker.class.getCanonicalName(),
+        null,
         uniquify("blah").getBytes());
   }
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index eced7f9..679b6ba 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -56,7 +56,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
   public void onConnected(GrpcServiceConfig request, 
StreamObserver<GrpcCompensateCommand> responseObserver) {
     omegaCallbacks
         .computeIfAbsent(request.getServiceName(), key -> new 
ConcurrentHashMap<>())
-        .computeIfAbsent(request.getInstanceId(), key -> new 
GrpcOmegaCallback(responseObserver));
+        .put(request.getInstanceId(), new GrpcOmegaCallback(responseObserver));
   }
 
   // TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback 
may not be registered on disconnected
@@ -75,15 +75,18 @@ class GrpcTxEventEndpointImpl extends 
TxEventServiceImplBase {
 
   @Override
   public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> 
responseObserver) {
+    Date date = new Date(message.getTimestamp());
+    int timeout = message.getTimeout();
     boolean ok = txConsistentService.handle(new TxEvent(
         message.getServiceName(),
         message.getInstanceId(),
-        new Date(message.getTimestamp()),
+        date,
         message.getGlobalTxId(),
         message.getLocalTxId(),
         message.getParentTxId().isEmpty() ? null : message.getParentTxId(),
         message.getType(),
         message.getCompensationMethod(),
+        timeout == 0 ? null : new Date(date.getTime() + timeout),
         message.getPayloads().toByteArray()
     ));
 
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index ad32148..c3e8f04 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -37,6 +37,11 @@ class SpringTxEventRepository implements TxEventRepository {
   }
 
   @Override
+  public Optional<TxEvent> findFirstAbortedGlobalTransaction() {
+    return eventRepo.findFirstAbortedGlobalTxByType();
+  }
+
+  @Override
   public List<TxEvent> findTransactions(String globalTxId, String type) {
     return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type);
   }
@@ -52,7 +57,12 @@ class SpringTxEventRepository implements TxEventRepository {
   }
 
   @Override
-  public void deleteDuplicateEvents(String type) {
-    eventRepo.deleteByType(type);
+  public Optional<TxEvent> findFirstTimeoutEventByIdGreaterThan(long id) {
+    return eventRepo.findFirstTimeoutSurrogateIdGreaterThan(id);
+  }
+
+  @Override
+  public void deleteDuplicateEvents(List<String> types) {
+    eventRepo.deleteByTypes(types);
   }
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 2e52fef..e46b264 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -31,8 +31,16 @@ import org.springframework.data.repository.CrudRepository;
 interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
   List<TxEvent> findByGlobalTxId(String globalTxId);
 
+  @Query("SELECT t FROM TxEvent t "
+      + "WHERE t.type = 'TxAbortedEvent' AND NOT EXISTS( "
+      + "  SELECT t1.globalTxId FROM TxEvent t1"
+      + "  WHERE t1.globalTxId = t.globalTxId "
+      + "    AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent'))")
+  Optional<TxEvent> findFirstAbortedGlobalTxByType();
+
   @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
-      + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, 
t.type, t.compensationMethod, t.payloads"
+      + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, 
"
+      + "t.type, t.compensationMethod, t.expireTime, t.payloads"
       + ") FROM TxEvent t "
       + "WHERE t.globalTxId = ?1 AND t.type = ?2")
   List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String 
type);
@@ -70,13 +78,26 @@ interface TxEventEnvelopeRepository extends 
CrudRepository<TxEvent, Long> {
 
   Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long 
surrogateId);
 
+  @Query("SELECT t FROM TxEvent t "
+      + "WHERE t.type IN ('TxStartedEvent', 'SagaStartedEvent') "
+      + "  AND t.expireTime IS NOT NULL "
+      + "  AND t.expireTime < CURRENT_TIMESTAMP "
+      + "  AND t.surrogateId > ?1 AND NOT EXISTS ("
+      + "  SELECT t1.globalTxId"
+      + "  FROM TxEvent t1 "
+      + "  WHERE t1.globalTxId = t.globalTxId "
+      + "  AND t1.localTxId = t.localTxId "
+      + "  AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent')) "
+      + "ORDER BY t.surrogateId ASC")
+  Optional<TxEvent> findFirstTimeoutSurrogateIdGreaterThan(long surrogateId);
+
   @Transactional
   @Modifying(clearAutomatically = true)
   @Query("DELETE FROM TxEvent t "
-      + "WHERE t.type = ?1 AND t.surrogateId NOT IN ("
+      + "WHERE t.type IN ?1 AND t.surrogateId NOT IN ("
       + " SELECT MAX(t1.surrogateId) FROM TxEvent t1 "
-      + " WHERE t1.type = ?1"
+      + " WHERE t1.type = t.type"
       + " GROUP BY t1.globalTxId"
       + ")")
-  void deleteByType(String type);
+  void deleteByTypes(List<String> types);
 }
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql 
b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index d6b5172..a3aee1c 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -3,6 +3,7 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   serviceName varchar(16) NOT NULL,
   instanceId varchar(36) NOT NULL,
   creationTime timestamp(6) NOT NULL DEFAULT CURRENT_DATE,
+  expireTime timestamp(6) NULL,
   globalTxId varchar(36) NOT NULL,
   localTxId varchar(36) NOT NULL,
   parentTxId varchar(36) DEFAULT NULL,
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 539f610..585e755 100644
--- 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -20,6 +20,7 @@ package org.apache.servicecomb.saga.alpha.server;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
@@ -367,6 +368,34 @@ public class AlphaIntegrationTest {
     });
   }
 
+  @Test
+  public void abortTimeoutSagaStartedEvent() {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, 
globalTxId, null, 1));
+
+    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 3);
+
+    List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+    assertThat(events.get(0).type(), is(SagaStartedEvent.name()));
+    assertThat(events.get(1).type(), is(TxAbortedEvent.name()));
+    assertThat(events.get(2).type(), is(SagaEndedEvent.name()));
+  }
+
+  @Test
+  public void abortTimeoutTxStartedEvent() {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, 
globalTxId));
+    blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, 
globalTxId, 1));
+
+    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4);
+
+    List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+    assertThat(events.get(0).type(), is(SagaStartedEvent.name()));
+    assertThat(events.get(1).type(), is(TxStartedEvent.name()));
+    assertThat(events.get(2).type(), is(TxAbortedEvent.name()));
+    assertThat(events.get(3).type(), is(SagaEndedEvent.name()));
+  }
+
   private GrpcAck onCompensation(GrpcCompensateCommand command) {
     return blockingStub.onTxEvent(
         eventOf(TxCompensatedEvent,
@@ -393,9 +422,14 @@ public class AlphaIntegrationTest {
         parentTxId,
         TxAbortedEvent.name(),
         compensationMethod,
+        null,
         payload.getBytes());
   }
 
+  private GrpcTxEvent someGrpcEventWithTimeout(EventType type, String 
localTxId, String parentTxId, int timeout) {
+    return eventOf(type, globalTxId, localTxId, parentTxId, 
payload.getBytes(), getClass().getCanonicalName(), timeout);
+  }
+
   private GrpcTxEvent someGrpcEvent(EventType type) {
     return eventOf(type, localTxId, parentTxId, payload.getBytes(), 
getClass().getCanonicalName());
   }
@@ -405,11 +439,11 @@ public class AlphaIntegrationTest {
   }
 
   private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String 
localTxId) {
-    return eventOf(type, globalTxId, localTxId, parentTxId, 
payload.getBytes(), getClass().getCanonicalName());
+    return eventOf(type, globalTxId, localTxId, parentTxId, 
payload.getBytes(), getClass().getCanonicalName(), 0);
   }
 
   private GrpcTxEvent eventOf(EventType eventType, String localTxId, String 
parentTxId, byte[] payloads, String compensationMethod) {
-    return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, 
compensationMethod);
+    return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, 
compensationMethod, 0);
   }
 
   private GrpcTxEvent eventOf(EventType eventType,
@@ -417,7 +451,8 @@ public class AlphaIntegrationTest {
       String localTxId,
       String parentTxId,
       byte[] payloads,
-      String compensationMethod) {
+      String compensationMethod,
+      int timeout) {
 
     return GrpcTxEvent.newBuilder()
         .setServiceName(serviceName)
@@ -428,6 +463,7 @@ public class AlphaIntegrationTest {
         .setParentTxId(parentTxId == null ? "" : parentTxId)
         .setType(eventType.name())
         .setCompensationMethod(compensationMethod)
+        .setTimeout(timeout)
         .setPayloads(ByteString.copyFrom(payloads))
         .build();
   }
diff --git a/alpha/alpha-server/src/test/resources/schema.sql 
b/alpha/alpha-server/src/test/resources/schema.sql
index 344fdda..53bcd1e 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -3,6 +3,7 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   serviceName varchar(36) NOT NULL,
   instanceId varchar(36) NOT NULL,
   creationTime TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
+  expireTime TIMESTAMP NULL,
   globalTxId varchar(36) NOT NULL,
   localTxId varchar(36) NOT NULL,
   parentTxId varchar(36) DEFAULT NULL,
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 8062ae9..bb24c5c 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -97,7 +97,8 @@ public class LoadBalancedClusterMessageSenderTest {
   private final String localTxId = uniquify("localTxId");
   private final String parentTxId = uniquify("parentTxId");
   private final String compensationMethod = getClass().getCanonicalName();
-  private final TxEvent event = new TxEvent(EventType.TxStartedEvent, 
globalTxId, localTxId, parentTxId, compensationMethod, "blah");
+  private final TxEvent event = new TxEvent(EventType.TxStartedEvent, 
globalTxId, localTxId, parentTxId,
+      compensationMethod, 0, "blah");
 
   private final String serviceName = uniquify("serviceName");
   private final String[] addresses = {"localhost:8080", "localhost:8090"};
@@ -299,7 +300,7 @@ public class LoadBalancedClusterMessageSenderTest {
   public void forwardSendResult() {
     assertThat(messageSender.send(event).aborted(), is(false));
 
-    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, 
parentTxId, "reject", "blah");
+    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, 
parentTxId, "reject", 0, "blah");
     assertThat(messageSender.send(rejectEvent).aborted(), is(true));
   }
 
@@ -356,6 +357,7 @@ public class LoadBalancedClusterMessageSenderTest {
           request.getLocalTxId(),
           request.getParentTxId(),
           request.getCompensationMethod(),
+          0,
           new String(request.getPayloads().toByteArray())));
 
       sleep();
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
index 562c50f..95bda85 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -42,7 +42,7 @@ public class RetryableMessageSenderTest {
 
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
-  private final TxStartedEvent event = new TxStartedEvent(globalTxId, 
localTxId, null, "method x");
+  private final TxStartedEvent event = new TxStartedEvent(globalTxId, 
localTxId, null, "method x", 0);
 
   @Test
   public void sendEventWhenSenderIsAvailable() {
@@ -56,7 +56,7 @@ public class RetryableMessageSenderTest {
 
   @Test
   public void blowsUpWhenEventIsSagaStarted() {
-    TxEvent event = new SagaStartedEvent(globalTxId, localTxId);
+    TxEvent event = new SagaStartedEvent(globalTxId, localTxId, 0);
 
     try {
       messageSender.send(event);
diff --git 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 7daf954..9fd2a7e 100644
--- 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -131,7 +131,7 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
@@ -152,7 +152,7 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, illegalUser).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, illegalUser).toString(),
             new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, throwable).toString()},
         toArray(messages)
     );
@@ -174,9 +174,9 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, anotherUser).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, 0, anotherUser).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod).toString(),
             new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString(),
             new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod).toString()
@@ -196,9 +196,9 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, jack).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, 0, jack).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
@@ -215,9 +215,9 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, jack).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, 0, jack).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
@@ -237,7 +237,7 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
@@ -255,7 +255,7 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[] {
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 074a5ec..53e5158 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -29,9 +29,9 @@ class CompensableInterceptor implements EventAwareInterceptor 
{
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, Object... message) {
-    return sender
-        .send(new TxStartedEvent(context.globalTxId(), context.localTxId(), 
parentTxId, compensationMethod, message));
+  public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, int timeout, Object... message) {
+    return sender.send(new TxStartedEvent(
+        context.globalTxId(), context.localTxId(), parentTxId, 
compensationMethod, timeout, message));
   }
 
   @Override
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index 4425949..7b71dd4 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -20,7 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction;
 public interface EventAwareInterceptor {
   EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
     @Override
-    public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, Object... message) {
+    public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, int timeout, Object... message) {
       return new AlphaResponse(false);
     }
 
@@ -33,7 +33,7 @@ public interface EventAwareInterceptor {
     }
   };
 
-  AlphaResponse preIntercept(String parentTxId, String compensationMethod, 
Object... message);
+  AlphaResponse preIntercept(String parentTxId, String compensationMethod, int 
timeout, Object... message);
 
   void postIntercept(String parentTxId, String compensationMethod) throws 
Throwable;
 
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
index 7074d8f..8c70e3a 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
@@ -22,6 +22,6 @@ import org.apache.servicecomb.saga.common.EventType;
 public class SagaEndedEvent extends TxEvent {
 
   public SagaEndedEvent(String globalTxId, String localTxId) {
-    super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "");
+    super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0);
   }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 7ef021a..d3d55fe 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -32,9 +32,9 @@ class SagaStartAnnotationProcessor implements 
EventAwareInterceptor {
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, Object... message) {
+  public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, int timeout, Object... message) {
     try {
-      return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), 
omegaContext.localTxId()));
+      return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), 
omegaContext.localTxId(), timeout));
     } catch (OmegaException e) {
       throw new TransactionalException(e.getMessage(), e.getCause());
     }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index 0951752..db8e3a0 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -52,7 +52,7 @@ public class SagaStartAspect {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
 
     TimeAwareInterceptor interceptor = new 
TimeAwareInterceptor(sagaStartAnnotationProcessor);
-    interceptor.preIntercept(context.globalTxId(), method.toString());
+    interceptor.preIntercept(context.globalTxId(), method.toString(), 
sagaStart.timeout());
     LOG.debug("Initialized context {} before execution of method {}", context, 
method.toString());
 
     scheduleTimeoutTask(interceptor, method, sagaStart.timeout());
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
index 54f61e4..cb76a26 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
@@ -20,9 +20,8 @@ package org.apache.servicecomb.saga.omega.transaction;
 import org.apache.servicecomb.saga.common.EventType;
 
 public class SagaStartedEvent extends TxEvent {
-
-  public SagaStartedEvent(String globalTxId, String localTxId) {
+  public SagaStartedEvent(String globalTxId, String localTxId, int timeout) {
     // use "" instead of null as compensationMethod requires not null in sql
-    super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "");
+    super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", 
timeout);
   }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
index 2057fbc..9de26d2 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -30,8 +30,8 @@ class TimeAwareInterceptor implements EventAwareInterceptor {
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String signature, 
Object... args) {
-    return interceptor.preIntercept(parentTxId, signature, args);
+  public AlphaResponse preIntercept(String parentTxId, String signature, int 
timeout, Object... args) {
+    return interceptor.preIntercept(parentTxId, signature, timeout, args);
   }
 
   @Override
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 5a61dc7..718d1fd 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -59,7 +59,7 @@ public class TransactionAspect {
     context.newLocalTxId();
 
     TimeAwareInterceptor interceptor = new 
TimeAwareInterceptor(this.interceptor);
-    AlphaResponse response = interceptor.preIntercept(localTxId, signature, 
joinPoint.getArgs());
+    AlphaResponse response = interceptor.preIntercept(localTxId, signature, 
compensable.timeout(), joinPoint.getArgs());
     if (response.aborted()) {
       String abortedLocalTxId = context.localTxId();
       context.setLocalTxId(localTxId);
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
index 13df2f7..d6aa533 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
@@ -17,14 +17,14 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import org.apache.servicecomb.saga.common.EventType;
-
 import java.io.PrintWriter;
 import java.io.StringWriter;
 
+import org.apache.servicecomb.saga.common.EventType;
+
 public class TxAbortedEvent extends TxEvent {
   public TxAbortedEvent(String globalTxId, String localTxId, String 
parentTxId, String compensationMethod, Throwable throwable) {
-    super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, stackTrace(throwable));
+    super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, 0, stackTrace(throwable));
   }
 
   private static String stackTrace(Throwable e) {
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
index dbbaeab..8e288df 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
@@ -21,6 +21,6 @@ import org.apache.servicecomb.saga.common.EventType;
 
 public class TxCompensatedEvent extends TxEvent {
   public TxCompensatedEvent(String globalTxId, String localTxId, String 
parentTxId, String compensationMethod) {
-    super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod);
+    super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, 0);
   }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
index 4e587c8..8d6666a 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
@@ -21,6 +21,6 @@ import org.apache.servicecomb.saga.common.EventType;
 
 public class TxEndedEvent extends TxEvent {
   public TxEndedEvent(String globalTxId, String localTxId, String parentTxId, 
String compensationMethod) {
-    super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod);
+    super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, 0);
   }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
index 1398d3e..34be420 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
@@ -17,10 +17,10 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import org.apache.servicecomb.saga.common.EventType;
-
 import java.util.Arrays;
 
+import org.apache.servicecomb.saga.common.EventType;
+
 public class TxEvent {
 
   private final long timestamp;
@@ -29,9 +29,11 @@ public class TxEvent {
   private final String localTxId;
   private final String parentTxId;
   private final String compensationMethod;
+  private final int timeout;
   private final Object[] payloads;
 
-  public TxEvent(EventType type, String globalTxId, String localTxId, String 
parentTxId, String compensationMethod, Object... payloads) {
+  public TxEvent(EventType type, String globalTxId, String localTxId, String 
parentTxId, String compensationMethod,
+      int timeout, Object... payloads) {
     this.timestamp = System.currentTimeMillis();
     this.type = type;
     this.localTxId = localTxId;
@@ -39,6 +41,7 @@ public class TxEvent {
     this.compensationMethod = compensationMethod;
     this.payloads = payloads;
     this.globalTxId = globalTxId;
+    this.timeout = timeout;
   }
 
   public long timestamp() {
@@ -69,6 +72,10 @@ public class TxEvent {
     return compensationMethod;
   }
 
+  public int timeout() {
+    return timeout;
+  }
+
   @Override
   public String toString() {
     return type.name() + "{" +
@@ -76,6 +83,7 @@ public class TxEvent {
         ", localTxId='" + localTxId + '\'' +
         ", parentTxId='" + parentTxId + '\'' +
         ", compensationMethod='" + compensationMethod + '\'' +
+        ", timeout='" + timeout + '\'' +
         ", payloads=" + Arrays.toString(payloads) +
         '}';
   }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
index ce93ea3..4732d95 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
@@ -21,7 +21,8 @@ import org.apache.servicecomb.saga.common.EventType;
 
 public class TxStartedEvent extends TxEvent {
 
-  public TxStartedEvent(String globalTxId, String localTxId, String 
parentTxId, String compensationMethod, Object... payloads) {
-    super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, payloads);
+  public TxStartedEvent(String globalTxId, String localTxId, String parentTxId,
+      String compensationMethod, int timeout, Object... payloads) {
+    super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, timeout, payloads);
   }
 }
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index 21af7e6..0ef9d4d 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -62,7 +62,7 @@ public class CompensableInterceptorTest {
 
   @Test
   public void sendsTxStartedEventBefore() throws Exception {
-    interceptor.preIntercept(parentTxId, compensationMethod, message);
+    interceptor.preIntercept(parentTxId, compensationMethod, 0, message);
 
     TxEvent event = messages.get(0);
 
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index 566a456..cc84fc5 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -65,7 +65,7 @@ public class SagaStartAnnotationProcessorTest {
 
   @Test
   public void sendsSagaStartedEvent() {
-    sagaStartAnnotationProcessor.preIntercept(null, null);
+    sagaStartAnnotationProcessor.preIntercept(null, null, 0);
 
     TxEvent event = messages.get(0);
 
@@ -99,7 +99,7 @@ public class SagaStartAnnotationProcessorTest {
     doThrow(exception).when(sender).send(any());
 
     try {
-      sagaStartAnnotationProcessor.preIntercept(null, null);
+      sagaStartAnnotationProcessor.preIntercept(null, null, 0);
       expectFailing(TransactionalException.class);
     } catch (TransactionalException e) {
       assertThat(e.getMessage(), is("exception"));
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
index 1136a45..0f2d2eb 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
@@ -46,7 +46,7 @@ public class TimeAwareInterceptorTest {
 
   private final EventAwareInterceptor underlying = new EventAwareInterceptor() 
{
     @Override
-    public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, Object... message) {
+    public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, int timeout, Object... message) {
       return new AlphaResponse(false);
     }
 
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto 
b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 2636881..3944eee 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -46,6 +46,7 @@ message GrpcTxEvent {
   bytes payloads = 7;
   string serviceName = 8;
   string instanceId = 9;
+  int32 timeout = 10;
 }
 
 message GrpcCompensateCommand {

-- 
To stop receiving notification emails like this one, please contact
ningji...@apache.org.

Reply via email to