WillemJiang closed pull request #122: SCB-239 omega stateless
URL: https://github.com/apache/incubator-servicecomb-saga/pull/122
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 49c17560..1e6f21be 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -17,7 +17,7 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW;
 
 import java.util.Date;
 
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 2d51a74f..a52ebe5a 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -18,13 +18,14 @@
 package org.apache.servicecomb.saga.alpha.core;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
-import java.util.Date;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.slf4j.Logger;
@@ -37,6 +38,7 @@
   private final ScheduledExecutorService scheduler;
   private final TxEventRepository eventRepository;
   private final CommandRepository commandRepository;
+  private final TxTimeoutRepository timeoutRepository;
   private final OmegaCallback omegaCallback;
   private final int eventPollingInterval;
 
@@ -46,12 +48,13 @@
   public EventScanner(ScheduledExecutorService scheduler,
       TxEventRepository eventRepository,
       CommandRepository commandRepository,
+      TxTimeoutRepository timeoutRepository,
       OmegaCallback omegaCallback,
       int eventPollingInterval) {
-
     this.scheduler = scheduler;
     this.eventRepository = eventRepository;
     this.commandRepository = commandRepository;
+    this.timeoutRepository = timeoutRepository;
     this.omegaCallback = omegaCallback;
     this.eventPollingInterval = eventPollingInterval;
   }
@@ -64,16 +67,32 @@ public void run() {
   private void pollEvents() {
     scheduler.scheduleWithFixedDelay(
         () -> {
+          updateTimeoutStatus();
+          findTimeoutEvents();
+          abortTimeoutEvents();
           saveUncompensatedEventsToCommands();
           compensate();
           updateCompensatedCommands();
           deleteDuplicateSagaEndedEvents();
+          updateTransactionStatus();
         },
         0,
         eventPollingInterval,
         MILLISECONDS);
   }
 
+  private void findTimeoutEvents() {
+    eventRepository.findTimeoutEvents()
+        .forEach(event -> {
+          log.info("Found timeout event {}", event);
+          timeoutRepository.save(txTimeoutOf(event));
+        });
+  }
+
+  private void updateTimeoutStatus() {
+    timeoutRepository.markTimeoutAsDone();
+  }
+
   private void saveUncompensatedEventsToCommands() {
     
eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, 
TxEndedEvent.name())
         .forEach(event -> {
@@ -96,7 +115,7 @@ private void deleteDuplicateSagaEndedEvents() {
     try {
       eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
     } catch (Exception e) {
-      log.warn("Failed to delete duplicate SagaEndedEvent", e);
+      log.warn("Failed to delete duplicate event", e);
     }
   }
 
@@ -109,6 +128,23 @@ private void updateCompensationStatus(TxEvent event) {
     markSagaEnded(event);
   }
 
+  private void abortTimeoutEvents() {
+    timeoutRepository.findFirstTimeout().forEach(timeout -> {
+      log.info("Found timeout event {} to abort", timeout);
+
+      eventRepository.save(toTxAbortedEvent(timeout));
+
+      if (timeout.type().equals(TxStartedEvent.name())) {
+        eventRepository.findTxStartedEventToCompensate(timeout.globalTxId(), 
timeout.localTxId())
+            .ifPresent(omegaCallback::compensate);
+      }
+    });
+  }
+
+  private void updateTransactionStatus() {
+    
eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEnd);
+  }
+
   private void markSagaEnded(TxEvent event) {
     if 
(commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
       markGlobalTxEnd(event);
@@ -120,11 +156,22 @@ private void markGlobalTxEnd(TxEvent event) {
     log.info("Marked end of transaction with globalTxId {}", 
event.globalTxId());
   }
 
+  private TxEvent toTxAbortedEvent(TxTimeout timeout) {
+    return new TxEvent(
+        timeout.serviceName(),
+        timeout.instanceId(),
+        timeout.globalTxId(),
+        timeout.localTxId(),
+        timeout.parentTxId(),
+        TxAbortedEvent.name(),
+        "",
+        ("Transaction timeout").getBytes());
+  }
+
   private TxEvent toSagaEndedEvent(TxEvent event) {
     return new TxEvent(
         event.serviceName(),
         event.instanceId(),
-        new Date(),
         event.globalTxId(),
         event.globalTxId(),
         null,
@@ -156,4 +203,18 @@ private TxEvent txStartedEventOf(Command command) {
         command.payloads()
     );
   }
+
+  private TxTimeout txTimeoutOf(TxEvent event) {
+    return new TxTimeout(
+        event.id(),
+        event.serviceName(),
+        event.instanceId(),
+        event.globalTxId(),
+        event.localTxId(),
+        event.parentTxId(),
+        event.type(),
+        event.expiryTime(),
+        NEW.name()
+    );
+  }
 }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TaskStatus.java
similarity index 96%
rename from 
alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
rename to 
alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TaskStatus.java
index 0c9b78b8..442213b3 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TaskStatus.java
@@ -17,7 +17,7 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-public enum CommandStatus {
+public enum TaskStatus {
   NEW,
   PENDING,
   DONE
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 1364cb79..42a202fd 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -17,15 +17,21 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import java.util.Date;
 
 import javax.persistence.Entity;
 import javax.persistence.GeneratedValue;
 import javax.persistence.GenerationType;
 import javax.persistence.Id;
+import javax.persistence.Transient;
 
 @Entity
 public class TxEvent {
+  @Transient
+  private static final long MAX_TIMESTAMP = 253402214400000L; // 9999-12-31 
00:00:00
+
   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long surrogateId;
@@ -38,6 +44,7 @@
   private String parentTxId;
   private String type;
   private String compensationMethod;
+  private Date expiryTime;
   private byte[] payloads;
 
   private TxEvent() {
@@ -53,6 +60,7 @@ public TxEvent(TxEvent event) {
         event.parentTxId,
         event.type,
         event.compensationMethod,
+        event.expiryTime,
         event.payloads);
   }
 
@@ -65,33 +73,36 @@ public TxEvent(
       String type,
       String compensationMethod,
       byte[] payloads) {
-    this(serviceName, instanceId, new Date(), globalTxId, localTxId, 
parentTxId, type, compensationMethod, payloads);
+    this(serviceName, instanceId, new Date(), globalTxId, localTxId, 
parentTxId, type, compensationMethod, 0, payloads);
   }
 
   public TxEvent(
       String serviceName,
       String instanceId,
-      Date creationTime,
       String globalTxId,
       String localTxId,
       String parentTxId,
       String type,
       String compensationMethod,
+      int timeout,
       byte[] payloads) {
-    this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, 
parentTxId, type, compensationMethod, payloads);
+    this(-1L, serviceName, instanceId, new Date(), globalTxId, localTxId, 
parentTxId, type, compensationMethod, timeout,
+        payloads);
   }
 
   public TxEvent(
-      long id,
       String serviceName,
       String instanceId,
+      Date creationTime,
       String globalTxId,
       String localTxId,
       String parentTxId,
       String type,
       String compensationMethod,
+      int timeout,
       byte[] payloads) {
-    this(id, serviceName, instanceId, new Date(), globalTxId, localTxId, 
parentTxId, type, compensationMethod, payloads);
+    this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, 
parentTxId, type, compensationMethod,
+        timeout, payloads);
   }
 
   TxEvent(Long surrogateId,
@@ -103,8 +114,25 @@ public TxEvent(
       String parentTxId,
       String type,
       String compensationMethod,
+      int timeout,
       byte[] payloads) {
+    this(surrogateId, serviceName, instanceId, creationTime, globalTxId, 
localTxId, parentTxId, type,
+        compensationMethod,
+        timeout == 0 ? new Date(MAX_TIMESTAMP) : new 
Date(creationTime.getTime() + SECONDS.toMillis(timeout)),
+        payloads);
+  }
 
+  TxEvent(Long surrogateId,
+      String serviceName,
+      String instanceId,
+      Date creationTime,
+      String globalTxId,
+      String localTxId,
+      String parentTxId,
+      String type,
+      String compensationMethod,
+      Date expiryTime,
+      byte[] payloads) {
     this.surrogateId = surrogateId;
     this.serviceName = serviceName;
     this.instanceId = instanceId;
@@ -114,6 +142,7 @@ public TxEvent(
     this.parentTxId = parentTxId;
     this.type = type;
     this.compensationMethod = compensationMethod;
+    this.expiryTime = expiryTime;
     this.payloads = payloads;
   }
 
@@ -157,6 +186,10 @@ public long id() {
     return surrogateId;
   }
 
+  public Date expiryTime() {
+    return expiryTime;
+  }
+
   @Override
   public String toString() {
     return "TxEvent{" +
@@ -169,6 +202,7 @@ public String toString() {
         ", parentTxId='" + parentTxId + '\'' +
         ", type='" + type + '\'' +
         ", compensationMethod='" + compensationMethod + '\'' +
+        ", expiryTime='" + expiryTime + '\'' +
         '}';
   }
 }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index b61aa069..0af6fb5f 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -23,6 +23,12 @@
 public interface TxEventRepository {
   void save(TxEvent event);
 
+  Optional<TxEvent> findFirstAbortedGlobalTransaction();
+
+  List<TxEvent> findTimeoutEvents();
+
+  Optional<TxEvent> findTxStartedEventToCompensate(String globalTxId, String 
localTxId);
+
   List<TxEvent> findTransactions(String globalTxId, String type);
 
   List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String 
type);
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java
new file mode 100644
index 00000000..00ca2ec7
--- /dev/null
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.core;
+
+import java.util.Date;
+
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Version;
+
+@Entity
+public class TxTimeout {
+  @Id
+  @GeneratedValue(strategy = GenerationType.IDENTITY)
+  private Long surrogateId;
+
+  private long eventId;
+  private String serviceName;
+  private String instanceId;
+  private String globalTxId;
+  private String localTxId;
+  private String parentTxId;
+  private String type;
+  private Date expiryTime;
+  private String status;
+
+  @Version
+  private long version;
+
+  TxTimeout() {
+  }
+
+  TxTimeout(long eventId, String serviceName, String instanceId, String 
globalTxId, String localTxId,
+      String parentTxId, String type, Date expiryTime, String status) {
+    this.eventId = eventId;
+    this.serviceName = serviceName;
+    this.instanceId = instanceId;
+    this.globalTxId = globalTxId;
+    this.localTxId = localTxId;
+    this.parentTxId = parentTxId;
+    this.type = type;
+    this.expiryTime = expiryTime;
+    this.status = status;
+  }
+
+  public String serviceName() {
+    return serviceName;
+  }
+
+  public String instanceId() {
+    return instanceId;
+  }
+
+  public String globalTxId() {
+    return globalTxId;
+  }
+
+  public String localTxId() {
+    return localTxId;
+  }
+
+  public String parentTxId() {
+    return parentTxId;
+  }
+
+  public String type() {
+    return type;
+  }
+
+  public Date expiryTime() {
+    return expiryTime;
+  }
+
+  public String status() {
+    return status;
+  }
+
+  @Override
+  public String toString() {
+    return "TxTimeout{" +
+        "eventId=" + eventId +
+        ", serviceName='" + serviceName + '\'' +
+        ", instanceId='" + instanceId + '\'' +
+        ", globalTxId='" + globalTxId + '\'' +
+        ", localTxId='" + localTxId + '\'' +
+        ", parentTxId='" + parentTxId + '\'' +
+        ", type='" + type + '\'' +
+        ", expiryTime=" + expiryTime +
+        ", status=" + status +
+        '}';
+  }
+}
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java
similarity index 79%
rename from 
omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
rename to 
alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java
index eb820d61..97387a36 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java
@@ -15,10 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.saga.omega.transaction;
+package org.apache.servicecomb.saga.alpha.core;
 
-public class OmegaTxTimeoutException extends RuntimeException {
-  public OmegaTxTimeoutException(String cause) {
-    super(cause);
-  }
+import java.util.List;
+
+public interface TxTimeoutRepository {
+  void save(TxTimeout timeout);
+
+  void markTimeoutAsDone();
+
+  List<TxTimeout> findFirstTimeout();
 }
diff --git 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
index 5cda4c54..4ded48af 100644
--- 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
+++ 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
@@ -28,7 +28,6 @@
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
-import java.util.Date;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -160,7 +159,6 @@ private TxEvent eventOf(String serviceName, String 
instanceId, EventType eventTy
     return new TxEvent(
         serviceName,
         instanceId,
-        new Date(),
         uniquify("globalTxId"),
         uniquify("localTxId"),
         UUID.randomUUID().toString(),
diff --git 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 231d5bff..d2209940 100644
--- 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -29,7 +29,6 @@
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
-import java.util.Date;
 import java.util.Deque;
 import java.util.List;
 import java.util.Optional;
@@ -38,6 +37,7 @@
 import java.util.stream.Collectors;
 
 import org.apache.servicecomb.saga.common.EventType;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TxConsistentServiceTest {
@@ -48,6 +48,23 @@ public void save(TxEvent event) {
       events.add(event);
     }
 
+    @Override
+    public Optional<TxEvent> findFirstAbortedGlobalTransaction() {
+      return Optional.empty();
+    }
+
+    @Override
+    public List<TxEvent> findTimeoutEvents() {
+      return emptyList();
+    }
+
+    @Override
+    public Optional<TxEvent> findTxStartedEventToCompensate(String globalTxId, 
String localTxId) {
+      return events.stream()
+          .filter(event -> globalTxId.equals(event.globalTxId()) && 
localTxId.equals(event.localTxId()))
+          .findFirst();
+    }
+
     @Override
     public List<TxEvent> findTransactions(String globalTxId, String type) {
       return events.stream()
@@ -81,6 +98,11 @@ public void deleteDuplicateEvents(String type) {
   private final TxConsistentService consistentService = new 
TxConsistentService(eventRepository);
   private final byte[] payloads = "yeah".getBytes();
 
+  @Before
+  public void setUp() throws Exception {
+    events.clear();
+  }
+
   @Test
   public void persistEventOnArrival() throws Exception {
     TxEvent[] events = {
@@ -111,11 +133,13 @@ public void skipTxStartedEvent_IfGlobalTxAlreadyFailed() {
   }
 
   private TxEvent newEvent(EventType eventType) {
-    return new TxEvent(serviceName, instanceId, new Date(), globalTxId, 
localTxId, parentTxId, eventType.name(), compensationMethod, payloads);
+    return new TxEvent(serviceName, instanceId, globalTxId, localTxId, 
parentTxId, eventType.name(), compensationMethod,
+        payloads);
   }
 
   private TxEvent eventOf(EventType eventType, String localTxId) {
-    return new TxEvent(serviceName, instanceId, new Date(),
+    return new TxEvent(serviceName,
+        instanceId,
         globalTxId,
         localTxId,
         UUID.randomUUID().toString(),
diff --git 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java
 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java
index 68c33a91..c14ffd9e 100644
--- 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java
+++ 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java
@@ -17,11 +17,9 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import org.apache.servicecomb.saga.common.EventType;
-
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
-import java.util.Date;
 import java.util.UUID;
 
 class TxEventMaker {
@@ -29,11 +27,10 @@ static TxEvent someEvent() {
     return new TxEvent(
         uniquify("serviceName"),
         uniquify("instanceId"),
-        new Date(),
         uniquify("globalTxId"),
         uniquify("localTxId"),
         UUID.randomUUID().toString(),
-        EventType.TxStartedEvent.name(),
+        TxStartedEvent.name(),
         TxEventMaker.class.getCanonicalName(),
         uniquify("blah").getBytes());
   }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index a4314377..6889c9f2 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -35,6 +35,7 @@
 import org.apache.servicecomb.saga.alpha.core.PushBackOmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
+import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.domain.EntityScan;
 import org.springframework.context.annotation.Bean;
@@ -69,6 +70,11 @@ CommandRepository 
springCommandRepository(TxEventEnvelopeRepository eventRepo, C
     return new SpringCommandRepository(eventRepo, commandRepository);
   }
 
+  @Bean
+  TxTimeoutRepository springTxTimeoutRepository(TxTimeoutEntityRepository 
timeoutRepo) {
+    return new SpringTxTimeoutRepository(timeoutRepo);
+  }
+
   @Bean
   ScheduledExecutorService compensationScheduler() {
     return scheduler;
@@ -80,14 +86,13 @@ TxConsistentService 
txConsistentService(@Value("${alpha.server.port:8080}") int
       ScheduledExecutorService scheduler,
       TxEventRepository eventRepository,
       CommandRepository commandRepository,
+      TxTimeoutRepository timeoutRepository,
       OmegaCallback omegaCallback,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
 
     new EventScanner(scheduler,
-        eventRepository,
-        commandRepository,
-        omegaCallback,
-        eventPollingInterval).run();
+        eventRepository, commandRepository, timeoutRepository,
+        omegaCallback, eventPollingInterval).run();
 
     TxConsistentService consistentService = new 
TxConsistentService(eventRepository);
 
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index eced7f9f..ee7e2e44 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -56,7 +56,7 @@
   public void onConnected(GrpcServiceConfig request, 
StreamObserver<GrpcCompensateCommand> responseObserver) {
     omegaCallbacks
         .computeIfAbsent(request.getServiceName(), key -> new 
ConcurrentHashMap<>())
-        .computeIfAbsent(request.getInstanceId(), key -> new 
GrpcOmegaCallback(responseObserver));
+        .put(request.getInstanceId(), new GrpcOmegaCallback(responseObserver));
   }
 
   // TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback 
may not be registered on disconnected
@@ -84,6 +84,7 @@ public void onTxEvent(GrpcTxEvent message, 
StreamObserver<GrpcAck> responseObser
         message.getParentTxId().isEmpty() ? null : message.getParentTxId(),
         message.getType(),
         message.getCompensationMethod(),
+        message.getTimeout(),
         message.getPayloads().toByteArray()
     ));
 
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index afbdaf5d..086f88ec 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -17,9 +17,9 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
-import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
-import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
-import static org.apache.servicecomb.saga.alpha.core.CommandStatus.PENDING;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.PENDING;
 
 import java.lang.invoke.MethodHandles;
 import java.util.LinkedHashMap;
@@ -33,11 +33,9 @@
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.data.domain.PageRequest;
 
 public class SpringCommandRepository implements CommandRepository {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private static final PageRequest SINGLE_COMMAND_REQUEST = new PageRequest(0, 
1);
 
   private final TxEventEnvelopeRepository eventRepository;
   private final CommandEntityRepository commandRepository;
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index ad321482..d6ea21c3 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -25,6 +25,7 @@
 import org.springframework.data.domain.PageRequest;
 
 class SpringTxEventRepository implements TxEventRepository {
+  private static final PageRequest SINGLE_TX_EVENT_REQUEST = new 
PageRequest(0, 1);
   private final TxEventEnvelopeRepository eventRepo;
 
   SpringTxEventRepository(TxEventEnvelopeRepository eventRepo) {
@@ -36,6 +37,21 @@ public void save(TxEvent event) {
     eventRepo.save(event);
   }
 
+  @Override
+  public Optional<TxEvent> findFirstAbortedGlobalTransaction() {
+    return eventRepo.findFirstAbortedGlobalTxByType();
+  }
+
+  @Override
+  public List<TxEvent> findTimeoutEvents() {
+    return eventRepo.findTimeoutEvents(SINGLE_TX_EVENT_REQUEST);
+  }
+
+  @Override
+  public Optional<TxEvent> findTxStartedEventToCompensate(String globalTxId, 
String localTxId) {
+    return eventRepo.findFirstStartedEventByGlobalTxIdAndLocalTxId(globalTxId, 
localTxId);
+  }
+
   @Override
   public List<TxEvent> findTransactions(String globalTxId, String type) {
     return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type);
@@ -43,7 +59,7 @@ public void save(TxEvent event) {
 
   @Override
   public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, 
String type) {
-    return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, new 
PageRequest(0, 1));
+    return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, 
SINGLE_TX_EVENT_REQUEST);
   }
 
   @Override
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
new file mode 100644
index 00000000..ee754969
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.PENDING;
+
+import java.util.List;
+
+import javax.transaction.Transactional;
+
+import org.apache.servicecomb.saga.alpha.core.TxTimeout;
+import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
+import org.springframework.data.domain.PageRequest;
+
+public class SpringTxTimeoutRepository implements TxTimeoutRepository {
+  private final TxTimeoutEntityRepository timeoutRepo;
+
+  SpringTxTimeoutRepository(TxTimeoutEntityRepository timeoutRepo) {
+    this.timeoutRepo = timeoutRepo;
+  }
+
+  @Override
+  public void save(TxTimeout timeout) {
+    timeoutRepo.save(timeout);
+  }
+
+  @Override
+  public void markTimeoutAsDone() {
+    timeoutRepo.updateStatusOfFinishedTx();
+  }
+
+  @Transactional
+  @Override
+  public List<TxTimeout> findFirstTimeout() {
+    List<TxTimeout> timeoutEvents = 
timeoutRepo.findFirstTimeoutTxOrderByExpireTimeAsc(new PageRequest(0, 1));
+    timeoutEvents.forEach(event -> timeoutRepo
+        .updateStatusByGlobalTxIdAndLocalTxId(PENDING.name(), 
event.globalTxId(), event.localTxId()));
+    return timeoutEvents;
+  }
+}
\ No newline at end of file
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 2e52fef2..0eaf0898 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -31,8 +31,32 @@
 interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
   List<TxEvent> findByGlobalTxId(String globalTxId);
 
+  @Query("SELECT t FROM TxEvent t "
+      + "WHERE t.type = 'TxAbortedEvent' AND NOT EXISTS( "
+      + "  SELECT t1.globalTxId FROM TxEvent t1"
+      + "  WHERE t1.globalTxId = t.globalTxId "
+      + "    AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent'))")
+  Optional<TxEvent> findFirstAbortedGlobalTxByType();
+
+  @Query("SELECT t FROM TxEvent t "
+      + "WHERE t.type IN ('TxStartedEvent', 'SagaStartedEvent') "
+      + "  AND t.expiryTime < CURRENT_TIMESTAMP AND NOT EXISTS( "
+      + "  SELECT t1.globalTxId FROM TxEvent t1 "
+      + "  WHERE t1.globalTxId = t.globalTxId "
+      + "    AND t1.localTxId = t.localTxId "
+      + "    AND t1.type != t.type"
+      + ")")
+  List<TxEvent> findTimeoutEvents(Pageable pageable);
+
+  @Query("SELECT t FROM TxEvent t "
+      + "WHERE t.globalTxId = ?1 "
+      + "  AND t.localTxId = ?2 "
+      + "  AND t.type = 'TxStartedEvent'")
+  Optional<TxEvent> findFirstStartedEventByGlobalTxIdAndLocalTxId(String 
globalTxId, String localTxId);
+
   @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
-      + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, 
t.type, t.compensationMethod, t.payloads"
+      + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, 
"
+      + "t.type, t.compensationMethod, t.payloads"
       + ") FROM TxEvent t "
       + "WHERE t.globalTxId = ?1 AND t.type = ?2")
   List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String 
type);
@@ -75,7 +99,7 @@
   @Query("DELETE FROM TxEvent t "
       + "WHERE t.type = ?1 AND t.surrogateId NOT IN ("
       + " SELECT MAX(t1.surrogateId) FROM TxEvent t1 "
-      + " WHERE t1.type = ?1"
+      + " WHERE t1.type = ?1 "
       + " GROUP BY t1.globalTxId"
       + ")")
   void deleteByType(String type);
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java
new file mode 100644
index 00000000..f0e264a4
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import java.util.List;
+
+import javax.persistence.LockModeType;
+import javax.transaction.Transactional;
+
+import org.apache.servicecomb.saga.alpha.core.TxTimeout;
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Lock;
+import org.springframework.data.jpa.repository.Modifying;
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
+
+interface TxTimeoutEntityRepository extends CrudRepository<TxTimeout, Long> {
+
+  @Transactional
+  @Modifying(clearAutomatically = true)
+  @Query("UPDATE org.apache.servicecomb.saga.alpha.core.TxTimeout t "
+      + "SET t.status = :status "
+      + "WHERE t.globalTxId = :globalTxId "
+      + "  AND t.localTxId = :localTxId")
+  void updateStatusByGlobalTxIdAndLocalTxId(
+      @Param("status") String status,
+      @Param("globalTxId") String globalTxId,
+      @Param("localTxId") String localTxId);
+
+  @Lock(LockModeType.OPTIMISTIC)
+  @Query("SELECT t FROM TxTimeout AS t "
+      + "WHERE t.status = 'NEW' "
+      + "  AND t.expiryTime < CURRENT_TIMESTAMP "
+      + "ORDER BY t.expiryTime ASC")
+  List<TxTimeout> findFirstTimeoutTxOrderByExpireTimeAsc(Pageable pageable);
+
+  @Transactional
+  @Modifying(clearAutomatically = true)
+  @Query("UPDATE TxTimeout t "
+      + "SET t.status = 'DONE' "
+      + "WHERE t.status != 'DONE' AND EXISTS ("
+      + "  SELECT t1.globalTxId FROM TxEvent t1 "
+      + "  WHERE t1.globalTxId = t.globalTxId "
+      + "    AND t1.localTxId = t.localTxId "
+      + "    AND t1.type != t.type"
+      + ")")
+  void updateStatusOfFinishedTx();
+}
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql 
b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index d6b51729..e7f774b7 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -8,6 +8,7 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   parentTxId varchar(36) DEFAULT NULL,
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
+  expiryTime timestamp(6) NOT NULL,
   payloads bytea
 );
 
@@ -30,3 +31,20 @@ CREATE TABLE IF NOT EXISTS Command (
 );
 
 CREATE INDEX IF NOT EXISTS saga_commands_index ON Command (surrogateId, 
eventId, globalTxId, localTxId, status);
+
+
+CREATE TABLE IF NOT EXISTS TxTimeout (
+  surrogateId BIGSERIAL PRIMARY KEY,
+  eventId bigint NOT NULL UNIQUE,
+  serviceName varchar(16) NOT NULL,
+  instanceId varchar(36) NOT NULL,
+  globalTxId varchar(36) NOT NULL,
+  localTxId varchar(36) NOT NULL,
+  parentTxId varchar(36) DEFAULT NULL,
+  type varchar(50) NOT NULL,
+  expiryTime TIMESTAMP NOT NULL,
+  status varchar(12),
+  version bigint NOT NULL
+);
+
+CREATE INDEX IF NOT EXISTS saga_timeouts_index ON TxTimeout (surrogateId, 
expiryTime, globalTxId, localTxId, status);
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaEventControllerTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaEventControllerTest.java
index a479e2b4..c212b3b4 100644
--- 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaEventControllerTest.java
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaEventControllerTest.java
@@ -27,7 +27,6 @@
 import static 
org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
 import static 
org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
-import java.util.Date;
 import java.util.UUID;
 
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
@@ -40,13 +39,10 @@
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.test.web.servlet.MockMvc;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 @RunWith(SpringRunner.class)
 @WebMvcTest(AlphaEventController.class)
 public class AlphaEventControllerTest {
   private final TxEvent someEvent = someEvent();
-  private final ObjectMapper mapper = new ObjectMapper();
 
   @Autowired
   private MockMvc mockMvc;
@@ -72,7 +68,6 @@ private TxEvent someEvent() {
     return new TxEvent(
         uniquify("serviceName"),
         uniquify("instanceId"),
-        new Date(),
         uniquify("globalTxId"),
         uniquify("localTxId"),
         UUID.randomUUID().toString(),
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 539f610a..497c2443 100644
--- 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -19,7 +19,9 @@
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
@@ -30,7 +32,6 @@
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -47,6 +48,8 @@
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
+import org.apache.servicecomb.saga.alpha.core.TxTimeout;
+import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
@@ -107,11 +110,17 @@
   private CommandRepository commandRepository;
 
   @Autowired
-  private CommandEntityRepository commandEntityRepository;
+  private TxTimeoutRepository timeoutRepository;
+
+  @Autowired
+  private TxTimeoutEntityRepository timeoutEntityRepository;
 
   @Autowired
   private OmegaCallback omegaCallback;
 
+  @Autowired
+  private CommandEntityRepository commandEntityRepository;
+
   @Autowired
   private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
@@ -144,6 +153,7 @@ public void deleteAllTillSuccessful() {
       try {
         eventRepo.deleteAll();
         commandEntityRepository.deleteAll();
+        timeoutEntityRepository.deleteAll();
         deleted = true;
       } catch (Exception ignored) {
       }
@@ -367,6 +377,54 @@ public void sagaEndedEventIsAlwaysInTheEnd() throws 
Exception {
     });
   }
 
+  @Test
+  public void abortTimeoutSagaStartedEvent() {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, 
globalTxId, null, 1));
+
+    await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3);
+
+    List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+    assertThat(events.get(0).type(), is(SagaStartedEvent.name()));
+    assertThat(events.get(1).type(), is(TxAbortedEvent.name()));
+    assertThat(events.get(2).type(), is(SagaEndedEvent.name()));
+
+    assertThat(timeoutEntityRepository.count(), is(1L));
+    Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll();
+    timeouts.forEach(timeout -> {
+      assertThat(timeout.status(), is(DONE.name()));
+      assertThat(timeout.globalTxId(), is(globalTxId));
+      assertThat(timeout.localTxId(), is(globalTxId));
+    });
+  }
+
+  @Test
+  public void abortTimeoutTxStartedEvent() {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, 
globalTxId));
+    blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, 
globalTxId, 1));
+
+    await().atMost(2, SECONDS).until(() -> {
+      List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+      return eventRepo.count() == 5 && events.get(events.size() - 
1).type().equals(SagaEndedEvent.name());
+    });
+
+    List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+    assertThat(events.get(0).type(), is(SagaStartedEvent.name()));
+    assertThat(events.get(1).type(), is(TxStartedEvent.name()));
+    assertThat(events.get(2).type(), is(TxAbortedEvent.name()));
+    assertThat(events.get(3).type(), is(TxCompensatedEvent.name()));
+    assertThat(events.get(4).type(), is(SagaEndedEvent.name()));
+
+    assertThat(timeoutEntityRepository.count(), is(1L));
+    Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll();
+    timeouts.forEach(timeout -> {
+      assertThat(timeout.status(), is(DONE.name()));
+      assertThat(timeout.globalTxId(), is(globalTxId));
+      assertThat(timeout.localTxId(), is(localTxId));
+    });
+  }
+
   private GrpcAck onCompensation(GrpcCompensateCommand command) {
     return blockingStub.onTxEvent(
         eventOf(TxCompensatedEvent,
@@ -387,7 +445,6 @@ private TxEvent someTxAbortEvent(String serviceName, String 
instanceId) {
     return new TxEvent(
         serviceName,
         instanceId,
-        new Date(),
         globalTxId,
         localTxId,
         parentTxId,
@@ -396,6 +453,10 @@ private TxEvent someTxAbortEvent(String serviceName, 
String instanceId) {
         payload.getBytes());
   }
 
+  private GrpcTxEvent someGrpcEventWithTimeout(EventType type, String 
localTxId, String parentTxId, int timeout) {
+    return eventOf(type, globalTxId, localTxId, parentTxId, 
payload.getBytes(), getClass().getCanonicalName(), timeout);
+  }
+
   private GrpcTxEvent someGrpcEvent(EventType type) {
     return eventOf(type, localTxId, parentTxId, payload.getBytes(), 
getClass().getCanonicalName());
   }
@@ -405,11 +466,11 @@ private GrpcTxEvent someGrpcEvent(EventType type, String 
globalTxId) {
   }
 
   private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String 
localTxId) {
-    return eventOf(type, globalTxId, localTxId, parentTxId, 
payload.getBytes(), getClass().getCanonicalName());
+    return eventOf(type, globalTxId, localTxId, parentTxId, 
payload.getBytes(), getClass().getCanonicalName(), 0);
   }
 
   private GrpcTxEvent eventOf(EventType eventType, String localTxId, String 
parentTxId, byte[] payloads, String compensationMethod) {
-    return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, 
compensationMethod);
+    return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, 
compensationMethod, 0);
   }
 
   private GrpcTxEvent eventOf(EventType eventType,
@@ -417,7 +478,8 @@ private GrpcTxEvent eventOf(EventType eventType,
       String localTxId,
       String parentTxId,
       byte[] payloads,
-      String compensationMethod) {
+      String compensationMethod,
+      int timeout) {
 
     return GrpcTxEvent.newBuilder()
         .setServiceName(serviceName)
@@ -428,6 +490,7 @@ private GrpcTxEvent eventOf(EventType eventType,
         .setParentTxId(parentTxId == null ? "" : parentTxId)
         .setType(eventType.name())
         .setCompensationMethod(compensationMethod)
+        .setTimeout(timeout)
         .setPayloads(ByteString.copyFrom(payloads))
         .build();
   }
@@ -472,7 +535,7 @@ void init() {
         Executors.newSingleThreadScheduledExecutor(),
         eventRepository,
         commandRepository,
-        omegaCallback,
-        1).run();
+        timeoutRepository,
+        omegaCallback, 1).run();
   }
 }
diff --git a/alpha/alpha-server/src/test/resources/schema.sql 
b/alpha/alpha-server/src/test/resources/schema.sql
index 344fdda2..929c69f8 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -8,8 +8,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   parentTxId varchar(36) DEFAULT NULL,
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
-  payloads varbinary(10240),
---  version bigint NOT NULL
+  expiryTime TIMESTAMP NOT NULL,
+  payloads varbinary(10240)
 );
 
 CREATE TABLE IF NOT EXISTS Command (
@@ -26,3 +26,17 @@ CREATE TABLE IF NOT EXISTS Command (
   lastModified TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
   version bigint NOT NULL
 );
+
+CREATE TABLE IF NOT EXISTS TxTimeout (
+  surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT 
BY 1) PRIMARY KEY,
+  eventId bigint NOT NULL UNIQUE,
+  serviceName varchar(36) NOT NULL,
+  instanceId varchar(36) NOT NULL,
+  globalTxId varchar(36) NOT NULL,
+  localTxId varchar(36) NOT NULL,
+  parentTxId varchar(36) DEFAULT NULL,
+  type varchar(50) NOT NULL,
+  expiryTime TIMESTAMP NOT NULL,
+  status varchar(12),
+  version bigint NOT NULL
+);
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 8062ae90..bb24c5cb 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -97,7 +97,8 @@
   private final String localTxId = uniquify("localTxId");
   private final String parentTxId = uniquify("parentTxId");
   private final String compensationMethod = getClass().getCanonicalName();
-  private final TxEvent event = new TxEvent(EventType.TxStartedEvent, 
globalTxId, localTxId, parentTxId, compensationMethod, "blah");
+  private final TxEvent event = new TxEvent(EventType.TxStartedEvent, 
globalTxId, localTxId, parentTxId,
+      compensationMethod, 0, "blah");
 
   private final String serviceName = uniquify("serviceName");
   private final String[] addresses = {"localhost:8080", "localhost:8090"};
@@ -299,7 +300,7 @@ public void stopSendingWhenClusterIsDown() throws Exception 
{
   public void forwardSendResult() {
     assertThat(messageSender.send(event).aborted(), is(false));
 
-    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, 
parentTxId, "reject", "blah");
+    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, 
parentTxId, "reject", 0, "blah");
     assertThat(messageSender.send(rejectEvent).aborted(), is(true));
   }
 
@@ -356,6 +357,7 @@ public void onTxEvent(GrpcTxEvent request, 
StreamObserver<GrpcAck> responseObser
           request.getLocalTxId(),
           request.getParentTxId(),
           request.getCompensationMethod(),
+          0,
           new String(request.getPayloads().toByteArray())));
 
       sleep();
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
index 562c50f8..95bda85f 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -42,7 +42,7 @@
 
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
-  private final TxStartedEvent event = new TxStartedEvent(globalTxId, 
localTxId, null, "method x");
+  private final TxStartedEvent event = new TxStartedEvent(globalTxId, 
localTxId, null, "method x", 0);
 
   @Test
   public void sendEventWhenSenderIsAvailable() {
@@ -56,7 +56,7 @@ public void sendEventWhenSenderIsAvailable() {
 
   @Test
   public void blowsUpWhenEventIsSagaStarted() {
-    TxEvent event = new SagaStartedEvent(globalTxId, localTxId);
+    TxEvent event = new SagaStartedEvent(globalTxId, localTxId, 0);
 
     try {
       messageSender.send(event);
diff --git 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 7daf9547..9fd2a7ed 100644
--- 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -131,7 +131,7 @@ public void sendsUserToRemote_AroundTransaction() throws 
Exception {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
@@ -152,7 +152,7 @@ public void sendsAbortEvent_OnSubTransactionFailure() 
throws Exception {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, illegalUser).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, illegalUser).toString(),
             new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, throwable).toString()},
         toArray(messages)
     );
@@ -174,9 +174,9 @@ public void compensateOnTransactionException() throws 
Exception {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, anotherUser).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, 0, anotherUser).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod).toString(),
             new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString(),
             new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod).toString()
@@ -196,9 +196,9 @@ public void passesOmegaContextThroughDifferentThreads() 
throws Exception {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, jack).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, 0, jack).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
@@ -215,9 +215,9 @@ public void passesOmegaContextInThreadPool() throws 
Exception {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, jack).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod, 0, jack).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
@@ -237,7 +237,7 @@ public void passesOmegaContextThroughReactiveX() throws 
Exception {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
@@ -255,7 +255,7 @@ public void passesOmegaContextAmongActors() throws 
Exception {
 
     assertArrayEquals(
         new String[] {
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 074a5eca..53e51581 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -29,9 +29,9 @@
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, Object... message) {
-    return sender
-        .send(new TxStartedEvent(context.globalTxId(), context.localTxId(), 
parentTxId, compensationMethod, message));
+  public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, int timeout, Object... message) {
+    return sender.send(new TxStartedEvent(
+        context.globalTxId(), context.localTxId(), parentTxId, 
compensationMethod, timeout, message));
   }
 
   @Override
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index 44259496..bb2cca4b 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -20,7 +20,7 @@
 public interface EventAwareInterceptor {
   EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
     @Override
-    public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, Object... message) {
+    public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, int timeout, Object... message) {
       return new AlphaResponse(false);
     }
 
@@ -33,9 +33,9 @@ public void onError(String parentTxId, String 
compensationMethod, Throwable thro
     }
   };
 
-  AlphaResponse preIntercept(String parentTxId, String compensationMethod, 
Object... message);
+  AlphaResponse preIntercept(String parentTxId, String compensationMethod, int 
timeout, Object... message);
 
-  void postIntercept(String parentTxId, String compensationMethod) throws 
Throwable;
+  void postIntercept(String parentTxId, String compensationMethod);
 
   void onError(String parentTxId, String compensationMethod, Throwable 
throwable);
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
index 7074d8fd..8c70e3a1 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
@@ -22,6 +22,6 @@
 public class SagaEndedEvent extends TxEvent {
 
   public SagaEndedEvent(String globalTxId, String localTxId) {
-    super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "");
+    super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0);
   }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 7ef021a2..d3d55fe5 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -32,9 +32,9 @@
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, Object... message) {
+  public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, int timeout, Object... message) {
     try {
-      return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), 
omegaContext.localTxId()));
+      return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), 
omegaContext.localTxId(), timeout));
     } catch (OmegaException e) {
       throw new TransactionalException(e.getMessage(), e.getCause());
     }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index 09517522..388f237d 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -17,12 +17,8 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
@@ -38,7 +34,7 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
-  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+
   private final OmegaContext context;
 
   public SagaStartAspect(MessageSender sender, OmegaContext context) {
@@ -51,15 +47,13 @@ Object advise(ProceedingJoinPoint joinPoint, SagaStart 
sagaStart) throws Throwab
     initializeOmegaContext();
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
 
-    TimeAwareInterceptor interceptor = new 
TimeAwareInterceptor(sagaStartAnnotationProcessor);
-    interceptor.preIntercept(context.globalTxId(), method.toString());
+    sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), 
method.toString(), sagaStart.timeout());
     LOG.debug("Initialized context {} before execution of method {}", context, 
method.toString());
 
-    scheduleTimeoutTask(interceptor, method, sagaStart.timeout());
     try {
       Object result = joinPoint.proceed();
 
-      interceptor.postIntercept(context.globalTxId(), method.toString());
+      sagaStartAnnotationProcessor.postIntercept(context.globalTxId(), 
method.toString());
       LOG.debug("Transaction with context {} has finished.", context);
 
       return result;
@@ -74,20 +68,4 @@ Object advise(ProceedingJoinPoint joinPoint, SagaStart 
sagaStart) throws Throwab
   private void initializeOmegaContext() {
     context.setLocalTxId(context.newGlobalTxId());
   }
-
-  private void scheduleTimeoutTask(
-      TimeAwareInterceptor interceptor,
-      Method method,
-      int timeout) {
-
-    if (timeout > 0) {
-      executor.schedule(
-          () -> interceptor.onTimeout(
-              context.globalTxId(),
-              method.toString(),
-              new OmegaTxTimeoutException("Saga " + method.toString() + " 
timed out")),
-          timeout,
-          MILLISECONDS);
-    }
-  }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
index 54f61e41..cb76a265 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
@@ -20,9 +20,8 @@
 import org.apache.servicecomb.saga.common.EventType;
 
 public class SagaStartedEvent extends TxEvent {
-
-  public SagaStartedEvent(String globalTxId, String localTxId) {
+  public SagaStartedEvent(String globalTxId, String localTxId, int timeout) {
     // use "" instead of null as compensationMethod requires not null in sql
-    super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "");
+    super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", 
timeout);
   }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
deleted file mode 100644
index 2057fbc5..00000000
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.omega.transaction;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-class TimeAwareInterceptor implements EventAwareInterceptor {
-  private final EventAwareInterceptor interceptor;
-  private final AtomicReference<EventAwareInterceptor> interceptorRef;
-  private Throwable throwable = null;
-
-  TimeAwareInterceptor(EventAwareInterceptor interceptor) {
-    this.interceptor = interceptor;
-    this.interceptorRef = new AtomicReference<>(interceptor);
-  }
-
-  @Override
-  public AlphaResponse preIntercept(String parentTxId, String signature, 
Object... args) {
-    return interceptor.preIntercept(parentTxId, signature, args);
-  }
-
-  @Override
-  public void postIntercept(String parentTxId, String signature) throws 
Throwable {
-    if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
-      interceptor.postIntercept(parentTxId, signature);
-    } else if (throwable != null) {
-      throw throwable;
-    }
-  }
-
-  @Override
-  public void onError(String parentTxId, String signature, Throwable 
throwable) {
-    if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
-      interceptor.onError(parentTxId, signature, throwable);
-    }
-  }
-
-  void onTimeout(String parentTxId, String signature, Throwable throwable) {
-    if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
-      interceptor.onError(parentTxId, signature, throwable);
-      this.throwable = throwable;
-    }
-  }
-}
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 5a61dc74..932b9901 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -17,12 +17,8 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
 import javax.transaction.InvalidTransactionException;
 
@@ -40,7 +36,7 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final OmegaContext context;
-  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+
   private final CompensableInterceptor interceptor;
 
   public TransactionAspect(MessageSender sender, OmegaContext context) {
@@ -58,8 +54,7 @@ Object advise(ProceedingJoinPoint joinPoint, Compensable 
compensable) throws Thr
     String localTxId = context.localTxId();
     context.newLocalTxId();
 
-    TimeAwareInterceptor interceptor = new 
TimeAwareInterceptor(this.interceptor);
-    AlphaResponse response = interceptor.preIntercept(localTxId, signature, 
joinPoint.getArgs());
+    AlphaResponse response = interceptor.preIntercept(localTxId, signature, 
compensable.timeout(), joinPoint.getArgs());
     if (response.aborted()) {
       String abortedLocalTxId = context.localTxId();
       context.setLocalTxId(localTxId);
@@ -68,9 +63,6 @@ Object advise(ProceedingJoinPoint joinPoint, Compensable 
compensable) throws Thr
     }
     LOG.debug("Updated context {} for compensable method {} ", context, 
method.toString());
 
-    // TODO: 2018/1/15 omega shall be stateless, all states shall be on alpha
-    scheduleTimeoutTask(interceptor, localTxId, signature, method, 
compensable.timeout());
-
     try {
       Object result = joinPoint.proceed();
       interceptor.postIntercept(localTxId, signature);
@@ -85,24 +77,6 @@ Object advise(ProceedingJoinPoint joinPoint, Compensable 
compensable) throws Thr
     }
   }
 
-  private void scheduleTimeoutTask(
-      TimeAwareInterceptor interceptor,
-      String localTxId,
-      String signature,
-      Method method,
-      int timeout) {
-
-    if (timeout > 0) {
-      executor.schedule(
-          () -> interceptor.onTimeout(
-              localTxId,
-              signature,
-              new OmegaTxTimeoutException("Transaction " + method.toString() + 
" timed out")),
-          timeout,
-          MILLISECONDS);
-    }
-  }
-
   private String compensationMethodSignature(ProceedingJoinPoint joinPoint, 
Compensable compensable, Method method)
       throws NoSuchMethodException {
 
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
index 13df2f73..d6aa5333 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
@@ -17,14 +17,14 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import org.apache.servicecomb.saga.common.EventType;
-
 import java.io.PrintWriter;
 import java.io.StringWriter;
 
+import org.apache.servicecomb.saga.common.EventType;
+
 public class TxAbortedEvent extends TxEvent {
   public TxAbortedEvent(String globalTxId, String localTxId, String 
parentTxId, String compensationMethod, Throwable throwable) {
-    super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, stackTrace(throwable));
+    super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, 0, stackTrace(throwable));
   }
 
   private static String stackTrace(Throwable e) {
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
index dbbaeab5..8e288dfb 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
@@ -21,6 +21,6 @@
 
 public class TxCompensatedEvent extends TxEvent {
   public TxCompensatedEvent(String globalTxId, String localTxId, String 
parentTxId, String compensationMethod) {
-    super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod);
+    super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, 0);
   }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
index 4e587c85..8d6666a6 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
@@ -21,6 +21,6 @@
 
 public class TxEndedEvent extends TxEvent {
   public TxEndedEvent(String globalTxId, String localTxId, String parentTxId, 
String compensationMethod) {
-    super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod);
+    super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, 0);
   }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
index 1398d3ea..34be420e 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
@@ -17,10 +17,10 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import org.apache.servicecomb.saga.common.EventType;
-
 import java.util.Arrays;
 
+import org.apache.servicecomb.saga.common.EventType;
+
 public class TxEvent {
 
   private final long timestamp;
@@ -29,9 +29,11 @@
   private final String localTxId;
   private final String parentTxId;
   private final String compensationMethod;
+  private final int timeout;
   private final Object[] payloads;
 
-  public TxEvent(EventType type, String globalTxId, String localTxId, String 
parentTxId, String compensationMethod, Object... payloads) {
+  public TxEvent(EventType type, String globalTxId, String localTxId, String 
parentTxId, String compensationMethod,
+      int timeout, Object... payloads) {
     this.timestamp = System.currentTimeMillis();
     this.type = type;
     this.localTxId = localTxId;
@@ -39,6 +41,7 @@ public TxEvent(EventType type, String globalTxId, String 
localTxId, String paren
     this.compensationMethod = compensationMethod;
     this.payloads = payloads;
     this.globalTxId = globalTxId;
+    this.timeout = timeout;
   }
 
   public long timestamp() {
@@ -69,6 +72,10 @@ public String compensationMethod() {
     return compensationMethod;
   }
 
+  public int timeout() {
+    return timeout;
+  }
+
   @Override
   public String toString() {
     return type.name() + "{" +
@@ -76,6 +83,7 @@ public String toString() {
         ", localTxId='" + localTxId + '\'' +
         ", parentTxId='" + parentTxId + '\'' +
         ", compensationMethod='" + compensationMethod + '\'' +
+        ", timeout='" + timeout + '\'' +
         ", payloads=" + Arrays.toString(payloads) +
         '}';
   }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
index ce93ea3e..4732d952 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
@@ -21,7 +21,8 @@
 
 public class TxStartedEvent extends TxEvent {
 
-  public TxStartedEvent(String globalTxId, String localTxId, String 
parentTxId, String compensationMethod, Object... payloads) {
-    super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, payloads);
+  public TxStartedEvent(String globalTxId, String localTxId, String parentTxId,
+      String compensationMethod, int timeout, Object... payloads) {
+    super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, 
compensationMethod, timeout, payloads);
   }
 }
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index 21af7e6e..0ef9d4dc 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -62,7 +62,7 @@ public void setUp() throws Exception {
 
   @Test
   public void sendsTxStartedEventBefore() throws Exception {
-    interceptor.preIntercept(parentTxId, compensationMethod, message);
+    interceptor.preIntercept(parentTxId, compensationMethod, 0, message);
 
     TxEvent event = messages.get(0);
 
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index 566a456b..cc84fc57 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -65,7 +65,7 @@ public void setUp() throws Exception {
 
   @Test
   public void sendsSagaStartedEvent() {
-    sagaStartAnnotationProcessor.preIntercept(null, null);
+    sagaStartAnnotationProcessor.preIntercept(null, null, 0);
 
     TxEvent event = messages.get(0);
 
@@ -99,7 +99,7 @@ public void transformInterceptedException() {
     doThrow(exception).when(sender).send(any());
 
     try {
-      sagaStartAnnotationProcessor.preIntercept(null, null);
+      sagaStartAnnotationProcessor.preIntercept(null, null, 0);
       expectFailing(TransactionalException.class);
     } catch (TransactionalException e) {
       assertThat(e.getMessage(), is("exception"));
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
index 1bc2b285..77d40ef9 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -18,20 +18,14 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
@@ -62,8 +56,6 @@
   private final OmegaContext omegaContext = new OmegaContext(idGenerator);
   private final SagaStartAspect aspect = new SagaStartAspect(sender, 
omegaContext);
 
-  private final ExecutorService executor = Executors.newSingleThreadExecutor();
-
   @Before
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(globalTxId);
@@ -120,42 +112,6 @@ public void clearContextOnSagaStartError() throws 
Throwable {
     assertThat(omegaContext.localTxId(), is(nullValue()));
   }
 
-  @Test
-  public void sendsAbortEventOnTimeout() throws Throwable {
-    CountDownLatch latch = new CountDownLatch(1);
-    when(sagaStart.timeout()).thenReturn(100);
-    when(joinPoint.proceed()).thenAnswer(invocationOnMock -> {
-      latch.await();
-      assertThat(omegaContext.localTxId(), is(globalTxId));
-      return null;
-    });
-
-    ExpectedException exception = ExpectedException.none();
-    executor.execute(() -> {
-      try {
-        aspect.advise(joinPoint, sagaStart);
-      } catch (Throwable throwable) {
-        exception.expect(OmegaTxTimeoutException.class);
-      }
-    });
-
-    await().atMost(1, SECONDS).until(() -> messages.size() == 2);
-
-    TxEvent event = messages.get(1);
-
-    assertThat(event.globalTxId(), is(globalTxId));
-    assertThat(event.localTxId(), is(globalTxId));
-    assertThat(event.parentTxId(), is(nullValue()));
-    assertThat(event.type(), is(EventType.TxAbortedEvent));
-
-    latch.countDown();
-
-    await().atMost(1, SECONDS).until(() -> omegaContext.localTxId() == null);
-
-    // no redundant ended message received
-    assertThat(messages.size(), is(2));
-  }
-
   private String doNothing() {
     return "doNothing";
   }
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
deleted file mode 100644
index 1136a450..00000000
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.omega.transaction;
-
-import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-public class TimeAwareInterceptorTest {
-  private static final int runningCounts = 1000;
-
-  private final String localTxId = uniquify("localTxId");
-  private final String signature = uniquify("signature");
-
-  private final AtomicInteger postInterceptInvoked = new AtomicInteger();
-  private final AtomicInteger onErrorInvoked = new AtomicInteger();
-  private final AtomicInteger onTimeoutInvoked = new AtomicInteger();
-
-  private final EventAwareInterceptor underlying = new EventAwareInterceptor() 
{
-    @Override
-    public AlphaResponse preIntercept(String parentTxId, String 
compensationMethod, Object... message) {
-      return new AlphaResponse(false);
-    }
-
-    @Override
-    public void postIntercept(String parentTxId, String compensationMethod) {
-      postInterceptInvoked.incrementAndGet();
-    }
-
-    @Override
-    public void onError(String parentTxId, String compensationMethod, 
Throwable throwable) {
-      if (throwable instanceof OmegaTxTimeoutException) {
-        onTimeoutInvoked.incrementAndGet();
-      } else {
-        onErrorInvoked.incrementAndGet();
-      }
-    }
-  };
-
-  private final ExecutorService executorService = 
Executors.newFixedThreadPool(2);
-  private final RuntimeException timeoutException = new 
OmegaTxTimeoutException("timed out");
-
-
-  @Test(timeout = 5000)
-  public void invokeEitherPostInterceptOrOnTimeoutConcurrently() throws 
Exception {
-    List<Future<?>> futures = new LinkedList<>();
-
-    for (int i = 0; i < runningCounts; i++) {
-      TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying);
-      CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
-      ExpectedException exception = ExpectedException.none();
-
-      futures.add(executorService.submit(() -> {
-        try {
-          waitForSignal(cyclicBarrier);
-          interceptor.postIntercept(localTxId, signature);
-        } catch (Throwable throwable) {
-          exception.expect(OmegaTxTimeoutException.class);
-        }
-      }));
-
-      futures.add(executorService.submit(() -> {
-        waitForSignal(cyclicBarrier);
-        interceptor.onTimeout(localTxId, signature, timeoutException);
-      }));
-    }
-
-    waitTillAllDone(futures);
-
-    assertThat(postInterceptInvoked.get() + onTimeoutInvoked.get(), 
is(runningCounts));
-  }
-
-  @Test(timeout = 5000)
-  public void invokeEitherOnErrorOrOnTimeoutConcurrently() throws Exception {
-    RuntimeException oops = new RuntimeException("oops");
-    List<Future<?>> futures = new LinkedList<>();
-
-    for (int i = 0; i < runningCounts; i++) {
-      TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying);
-      CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
-
-
-      futures.add(executorService.submit(() -> {
-        waitForSignal(cyclicBarrier);
-        interceptor.onError(localTxId, signature, oops);
-      }));
-
-      futures.add(executorService.submit(() -> {
-        waitForSignal(cyclicBarrier);
-        interceptor.onTimeout(localTxId, signature, timeoutException);
-      }));
-    }
-
-    waitTillAllDone(futures);
-
-    assertThat(onErrorInvoked.get() + onTimeoutInvoked.get(), 
is(runningCounts));
-  }
-
-  private void waitForSignal(CyclicBarrier cyclicBarrier) {
-    try {
-      cyclicBarrier.await();
-    } catch (InterruptedException | BrokenBarrierException e) {
-      fail(e.getMessage());
-    }
-  }
-
-  private void waitTillAllDone(List<Future<?>> futures)
-      throws InterruptedException, java.util.concurrent.ExecutionException {
-    for (Future<?> future : futures) {
-      future.get();
-    }
-  }
-}
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 8689a1e6..31d148fe 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -18,11 +18,8 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.awaitility.Awaitility.await;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -32,9 +29,6 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import javax.transaction.InvalidTransactionException;
 
@@ -69,8 +63,6 @@
   private final OmegaContext omegaContext = new OmegaContext(idGenerator);
   private final TransactionAspect aspect = new TransactionAspect(sender, 
omegaContext);
 
-  private final ExecutorService executor = Executors.newSingleThreadExecutor();
-
   @Before
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(newLocalTxId);
@@ -130,46 +122,6 @@ public void restoreContextOnCompensableError() throws 
Throwable {
     assertThat(omegaContext.localTxId(), is(localTxId));
   }
 
-  @Test
-  public void sendsAbortEventOnTimeout() throws Throwable {
-    CountDownLatch latch = new CountDownLatch(1);
-    when(compensable.timeout()).thenReturn(100);
-    when(joinPoint.proceed()).thenAnswer(invocationOnMock -> {
-      latch.await();
-      assertThat(omegaContext.localTxId(), is(newLocalTxId));
-      return null;
-    });
-
-    ExpectedException exception = ExpectedException.none();
-    executor.execute(() -> {
-      try {
-        // need to setup the thread local for it
-        omegaContext.setGlobalTxId(globalTxId);
-        omegaContext.setLocalTxId(localTxId);
-
-        aspect.advise(joinPoint, compensable);
-      } catch (Throwable throwable) {
-        exception.expect(OmegaTxTimeoutException.class);
-      }
-    });
-
-    await().atMost(1, SECONDS).until(() -> messages.size() == 2);
-
-    TxEvent event = messages.get(1);
-
-    assertThat(event.globalTxId(), is(globalTxId));
-    assertThat(event.localTxId(), is(newLocalTxId));
-    assertThat(event.parentTxId(), is(localTxId));
-    assertThat(event.type(), is(EventType.TxAbortedEvent));
-
-    latch.countDown();
-
-    await().atMost(1, SECONDS).until(() -> 
localTxId.equals(omegaContext.localTxId()));
-
-    // no redundant ended message received
-    assertThat(messages.size(), is(2));
-  }
-
   @Test
   public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable {
     MessageSender sender = mock(MessageSender.class);
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto 
b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 26368814..3944eee9 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -46,6 +46,7 @@ message GrpcTxEvent {
   bytes payloads = 7;
   string serviceName = 8;
   string instanceId = 9;
+  int32 timeout = 10;
 }
 
 message GrpcCompensateCommand {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to