This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 82ab5382df266a1566f38dcd0251fd29fe06d9f0
Author: Eric Lee <dagang...@huawei.com>
AuthorDate: Mon Jan 29 11:33:34 2018 +0800

    SCB-239 avoid duplicate abort events in concurrent processing
    
    Signed-off-by: Eric Lee <dagang...@huawei.com>
---
 .../servicecomb/saga/alpha/core/Command.java       |  2 +-
 .../servicecomb/saga/alpha/core/EventScanner.java  | 24 +++----
 .../core/{CommandStatus.java => TaskStatus.java}   |  2 +-
 .../saga/alpha/core/TxConsistentService.java       | 35 +++++++++-
 .../servicecomb/saga/alpha/core/TxEvent.java       | 39 ++++++-----
 .../saga/alpha/core/TxEventRepository.java         |  4 +-
 .../servicecomb/saga/alpha/core/TxTimeout.java     | 81 ++++++++++++++++++++++
 .../saga/alpha/core/TxTimeoutRepository.java       | 14 ++--
 .../alpha/core/CompositeOmegaCallbackTest.java     |  3 -
 .../saga/alpha/core/TxConsistentServiceTest.java   | 74 +++++++++++++++++---
 .../servicecomb/saga/alpha/core/TxEventMaker.java  |  3 -
 .../servicecomb/saga/alpha/server/AlphaConfig.java | 15 ++--
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |  6 +-
 .../saga/alpha/server/SpringCommandRepository.java |  8 +--
 .../saga/alpha/server/SpringTxEventRepository.java | 12 ++--
 .../alpha/server/SpringTxTimeoutRepository.java    | 63 +++++++++++++++++
 .../alpha/server/TxEventEnvelopeRepository.java    | 21 ++----
 .../alpha/server/TxTimeoutEntityRepository.java    | 68 ++++++++++++++++++
 .../src/main/resources/schema-postgresql.sql       | 16 ++++-
 .../saga/alpha/server/AlphaIntegrationTest.java    | 40 ++++++++---
 alpha/alpha-server/src/test/resources/schema.sql   | 12 +++-
 .../saga/omega/transaction/SagaStartAspect.java    |  5 +-
 .../transaction/OnceAwareInterceptorTest.java      |  7 +-
 23 files changed, 439 insertions(+), 115 deletions(-)

diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 49c1756..1e6f21b 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -17,7 +17,7 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW;
 
 import java.util.Date;
 
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 0980f3a..c10c09a 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -25,7 +25,6 @@ import static 
org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
-import java.util.Arrays;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.slf4j.Logger;
@@ -38,22 +37,23 @@ public class EventScanner implements Runnable {
   private final ScheduledExecutorService scheduler;
   private final TxEventRepository eventRepository;
   private final CommandRepository commandRepository;
+  private final TxTimeoutRepository timeoutRepository;
   private final OmegaCallback omegaCallback;
   private final int eventPollingInterval;
 
   private long nextEndedEventId;
   private long nextCompensatedEventId;
-  private long nextTimeoutEventId;
 
   public EventScanner(ScheduledExecutorService scheduler,
       TxEventRepository eventRepository,
       CommandRepository commandRepository,
+      TxTimeoutRepository timeoutRepository,
       OmegaCallback omegaCallback,
       int eventPollingInterval) {
-
     this.scheduler = scheduler;
     this.eventRepository = eventRepository;
     this.commandRepository = commandRepository;
+    this.timeoutRepository = timeoutRepository;
     this.omegaCallback = omegaCallback;
     this.eventPollingInterval = eventPollingInterval;
   }
@@ -70,7 +70,7 @@ public class EventScanner implements Runnable {
           saveUncompensatedEventsToCommands();
           compensate();
           updateCompensatedCommands();
-          deleteDuplicateEvents();
+          deleteDuplicateSagaEndedEvents();
           updateTransactionStatus();
         },
         0,
@@ -96,9 +96,9 @@ public class EventScanner implements Runnable {
         });
   }
 
-  private void deleteDuplicateEvents() {
+  private void deleteDuplicateSagaEndedEvents() {
     try {
-      
eventRepository.deleteDuplicateEvents(Arrays.asList(TxAbortedEvent.name(), 
SagaEndedEvent.name()));
+      eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
     } catch (Exception e) {
       log.warn("Failed to delete duplicate event", e);
     }
@@ -114,11 +114,11 @@ public class EventScanner implements Runnable {
   }
 
   private void abortTimeoutEvent() {
-    eventRepository.findFirstTimeoutEventByIdGreaterThan(nextTimeoutEventId)
-    .ifPresent((TxEvent event) -> {
+    timeoutRepository.findFirstTimeoutTxToAbort().forEach(event -> {
       log.info("Found timeout event {}", event);
-      nextTimeoutEventId = event.id();
+
       eventRepository.save(toTxAbortedEvent(event));
+      timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), 
event.localTxId());
     });
   }
 
@@ -134,6 +134,7 @@ public class EventScanner implements Runnable {
 
   private void markGlobalTxEnd(TxEvent event) {
     eventRepository.save(toSagaEndedEvent(event));
+    timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), 
event.localTxId());
     log.info("Marked end of transaction with globalTxId {}", 
event.globalTxId());
   }
 
@@ -146,8 +147,7 @@ public class EventScanner implements Runnable {
         event.parentTxId(),
         TxAbortedEvent.name(),
         "",
-        null,
-        EMPTY_PAYLOAD);
+        ("Transaction timeout").getBytes());
   }
 
   private TxEvent toSagaEndedEvent(TxEvent event) {
@@ -159,7 +159,6 @@ public class EventScanner implements Runnable {
         null,
         SagaEndedEvent.name(),
         "",
-        null,
         EMPTY_PAYLOAD);
   }
 
@@ -183,7 +182,6 @@ public class EventScanner implements Runnable {
         command.parentTxId(),
         TxStartedEvent.name(),
         command.compensationMethod(),
-        null,
         command.payloads()
     );
   }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TaskStatus.java
similarity index 96%
rename from 
alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
rename to 
alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TaskStatus.java
index 0c9b78b..442213b 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TaskStatus.java
@@ -17,7 +17,7 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-public enum CommandStatus {
+public enum TaskStatus {
   NEW,
   PENDING,
   DONE
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index c55090a..26309b6 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,10 +17,18 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW;
+import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.concurrent.CompletableFuture;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,9 +37,12 @@ public class TxConsistentService {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final TxEventRepository eventRepository;
+  private final TxTimeoutRepository timeoutRepository;
 
-  public TxConsistentService(TxEventRepository eventRepository) {
+  public TxConsistentService(TxEventRepository eventRepository,
+      TxTimeoutRepository timeoutRepository) {
     this.eventRepository = eventRepository;
+    this.timeoutRepository = timeoutRepository;
   }
 
   public boolean handle(TxEvent event) {
@@ -40,11 +51,33 @@ public class TxConsistentService {
       return false;
     }
 
+    if (isEventWithTimeout(event)) {
+      CompletableFuture.runAsync(() -> saveTxTimeout(event));
+    }
+
     eventRepository.save(event);
 
+    if (Arrays.asList(TxEndedEvent.name(), SagaEndedEvent.name(), 
TxAbortedEvent.name()).contains(event.type())) {
+      CompletableFuture.runAsync(() -> 
timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), event.localTxId()));
+    }
+
     return true;
   }
 
+  private boolean isEventWithTimeout(TxEvent event) {
+    return Arrays.asList(TxStartedEvent.name(), 
SagaStartedEvent.name()).contains(event.type()) && event.timeout() != 0;
+  }
+
+  private void saveTxTimeout(TxEvent event) {
+    Date expireTime = new Date(event.creationTime().getTime() + 
SECONDS.toMillis(event.timeout()));
+    timeoutRepository.save(
+        new TxTimeout(
+            event.globalTxId(),
+            event.localTxId(),
+            expireTime,
+            NEW.name()));
+  }
+
   private boolean isGlobalTxAborted(TxEvent event) {
     return !eventRepository.findTransactions(event.globalTxId(), 
TxAbortedEvent.name()).isEmpty();
   }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 04d385b..5966541 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -23,6 +23,8 @@ import javax.persistence.Entity;
 import javax.persistence.GeneratedValue;
 import javax.persistence.GenerationType;
 import javax.persistence.Id;
+import javax.persistence.Transient;
+import javax.persistence.Version;
 
 @Entity
 public class TxEvent {
@@ -33,7 +35,6 @@ public class TxEvent {
   private String serviceName;
   private String instanceId;
   private Date creationTime;
-  private Date expireTime;
   private String globalTxId;
   private String localTxId;
   private String parentTxId;
@@ -41,6 +42,12 @@ public class TxEvent {
   private String compensationMethod;
   private byte[] payloads;
 
+  @Version
+  private long version;
+
+  @Transient
+  private int timeout;
+
   private TxEvent() {
   }
 
@@ -65,27 +72,25 @@ public class TxEvent {
       String parentTxId,
       String type,
       String compensationMethod,
-      Date expireTime,
       byte[] payloads) {
-    this(-1L, serviceName, instanceId, new Date(), globalTxId, localTxId, 
parentTxId, type, compensationMethod, expireTime, payloads);
+    this(serviceName, instanceId, new Date(), globalTxId, localTxId, 
parentTxId, type, compensationMethod, 0, payloads);
   }
 
   public TxEvent(
       String serviceName,
       String instanceId,
-      Date creationTime,
       String globalTxId,
       String localTxId,
       String parentTxId,
       String type,
       String compensationMethod,
-      Date expireTime,
+      int timeout,
       byte[] payloads) {
-    this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, 
parentTxId, type, compensationMethod,
-        expireTime, payloads);
+    this(-1L, serviceName, instanceId, new Date(), globalTxId, localTxId, 
parentTxId, type, compensationMethod, timeout,
+        payloads);
   }
 
-  TxEvent(Long surrogateId,
+  public TxEvent(
       String serviceName,
       String instanceId,
       Date creationTime,
@@ -96,8 +101,8 @@ public class TxEvent {
       String compensationMethod,
       int timeout,
       byte[] payloads) {
-    this(surrogateId, serviceName, instanceId, creationTime, globalTxId, 
localTxId, parentTxId, type,
-        compensationMethod, timeout == 0 ? null : new 
Date(creationTime.getTime() + timeout*1000), payloads);
+    this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, 
parentTxId, type, compensationMethod,
+        timeout, payloads);
   }
 
   TxEvent(Long surrogateId,
@@ -109,7 +114,7 @@ public class TxEvent {
       String parentTxId,
       String type,
       String compensationMethod,
-      Date expireTime,
+      int timeout,
       byte[] payloads) {
 
     this.surrogateId = surrogateId;
@@ -121,8 +126,8 @@ public class TxEvent {
     this.parentTxId = parentTxId;
     this.type = type;
     this.compensationMethod = compensationMethod;
-    this.expireTime = expireTime;
     this.payloads = payloads;
+    this.timeout = timeout;
   }
 
   public String serviceName() {
@@ -157,10 +162,6 @@ public class TxEvent {
     return compensationMethod;
   }
 
-  public Date expireTime() {
-    return expireTime;
-  }
-
   public byte[] payloads() {
     return payloads;
   }
@@ -169,6 +170,10 @@ public class TxEvent {
     return surrogateId;
   }
 
+  public int timeout() {
+    return timeout;
+  }
+
   @Override
   public String toString() {
     return "TxEvent{" +
@@ -181,7 +186,7 @@ public class TxEvent {
         ", parentTxId='" + parentTxId + '\'' +
         ", type='" + type + '\'' +
         ", compensationMethod='" + compensationMethod + '\'' +
-        ", expireTime='" + expireTime + '\'' +
+        ", timeout='" + timeout + '\'' +
         '}';
   }
 }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index ec564f9..b974bd9 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -31,7 +31,5 @@ public interface TxEventRepository {
 
   Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String 
type);
 
-  Optional<TxEvent> findFirstTimeoutEventByIdGreaterThan(long id);
-
-  void deleteDuplicateEvents(List<String> types);
+  void deleteDuplicateEvents(String type);
 }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java
new file mode 100644
index 0000000..dc365e3
--- /dev/null
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.core;
+
+import java.util.Date;
+
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Version;
+
+@Entity
+public class TxTimeout {
+  @Id
+  @GeneratedValue(strategy = GenerationType.IDENTITY)
+  private Long surrogateId;
+
+  private String globalTxId;
+  private String localTxId;
+  private Date expireTime;
+  private String status;
+
+  @Version
+  private long version;
+
+  TxTimeout() {
+  }
+
+  public TxTimeout(String globalTxId, String localTxId, Date expireTime, 
String status) {
+    this.globalTxId = globalTxId;
+    this.localTxId = localTxId;
+    this.expireTime = expireTime;
+    this.status = status;
+  }
+
+  public String globalTxId() {
+    return globalTxId;
+  }
+
+  public String localTxId() {
+    return localTxId;
+  }
+
+  public Date expireTime() {
+    return expireTime;
+  }
+
+  public String status() {
+    return status;
+  }
+
+  public void setStatus(String status) {
+    this.status = status;
+  }
+
+  @Override
+  public String toString() {
+    return "TxTimeout{" +
+        "globalTxId='" + globalTxId + '\'' +
+        ", localTxId='" + localTxId + '\'' +
+        ", expireTime=" + expireTime +
+        ", status=" + status +
+        '}';
+  }
+}
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java
similarity index 76%
rename from 
omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
rename to 
alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java
index eb820d6..88758c7 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java
@@ -15,10 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.saga.omega.transaction;
+package org.apache.servicecomb.saga.alpha.core;
 
-public class OmegaTxTimeoutException extends RuntimeException {
-  public OmegaTxTimeoutException(String cause) {
-    super(cause);
-  }
+import java.util.List;
+
+public interface TxTimeoutRepository {
+  void save(TxTimeout event);
+
+  void markTxTimeoutAsDone(String globalTxId, String localTxId);
+
+  List<TxEvent> findFirstTimeoutTxToAbort();
 }
diff --git 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
index b201ebe..4ded48a 100644
--- 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
+++ 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
@@ -28,7 +28,6 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
-import java.util.Date;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -160,13 +159,11 @@ public class CompositeOmegaCallbackTest {
     return new TxEvent(
         serviceName,
         instanceId,
-        new Date(),
         uniquify("globalTxId"),
         uniquify("localTxId"),
         UUID.randomUUID().toString(),
         eventType.name(),
         getClass().getCanonicalName(),
-        null,
         uniquify("blah").getBytes());
   }
 }
diff --git 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index a73c754..43d3164 100644
--- 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -19,17 +19,19 @@ package org.apache.servicecomb.saga.alpha.core;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
-import java.util.Date;
 import java.util.Deque;
 import java.util.List;
 import java.util.Optional;
@@ -71,12 +73,31 @@ public class TxConsistentServiceTest {
     }
 
     @Override
-    public Optional<TxEvent> findFirstTimeoutEventByIdGreaterThan(long id) {
-      return Optional.empty();
+    public void deleteDuplicateEvents(String type) {
+    }
+  };
+
+  private final Deque<TxTimeout> timeouts = new ConcurrentLinkedDeque<>();
+  private final TxTimeoutRepository timeoutRepository = new 
TxTimeoutRepository() {
+    @Override
+    public void save(TxTimeout timeout) {
+      timeouts.add(timeout);
+    }
+
+    @Override
+    public void markTxTimeoutAsDone(String globalTxId, String localTxId) {
+      for (TxTimeout timeout : timeouts) {
+        if (timeout.globalTxId().equals(globalTxId) &&
+            timeout.localTxId().equals(localTxId)) {
+          timeout.setStatus(DONE.name());
+          break;
+        }
+      }
     }
 
     @Override
-    public void deleteDuplicateEvents(List<String> types) {
+    public List<TxEvent> findFirstTimeoutTxToAbort() {
+      return null;
     }
   };
 
@@ -88,7 +109,7 @@ public class TxConsistentServiceTest {
 
   private final String compensationMethod = getClass().getCanonicalName();
 
-  private final TxConsistentService consistentService = new 
TxConsistentService(eventRepository);
+  private final TxConsistentService consistentService = new 
TxConsistentService(eventRepository, timeoutRepository);
   private final byte[] payloads = "yeah".getBytes();
 
   @Test
@@ -105,6 +126,7 @@ public class TxConsistentServiceTest {
     }
 
     assertThat(this.events, contains(events));
+    assertThat(timeouts.isEmpty(), is(true));
   }
 
   @Test
@@ -118,20 +140,56 @@ public class TxConsistentServiceTest {
     consistentService.handle(event);
 
     assertThat(events.size(), is(2));
+    assertThat(timeouts.isEmpty(), is(true));
+  }
+
+  @Test
+  public void persistTimeoutEventOnArrival() {
+    TxEvent[] events = {
+        newEventWithTimeout(SagaStartedEvent, globalTxId,3),
+        newEventWithTimeout(TxStartedEvent, 2),
+        newEvent(TxEndedEvent),
+        newEvent(TxCompensatedEvent),
+        eventOf(SagaEndedEvent, globalTxId)};
+
+    for (TxEvent event : events) {
+      consistentService.handle(event);
+    }
+
+    assertThat(this.events, contains(events));
+    assertThat(timeouts.size(), is(2));
+    await().atMost(1, SECONDS).until(this::allTimeoutIsDone);
+  }
+
+  private boolean allTimeoutIsDone() {
+    for (TxTimeout timeout : timeouts) {
+      if (!timeout.status().equals(DONE.name())) {
+        return false;
+      }
+    }
+    return true;
   }
 
   private TxEvent newEvent(EventType eventType) {
-    return new TxEvent(serviceName, instanceId, new Date(), globalTxId, 
localTxId, parentTxId, eventType.name(), compensationMethod, null, payloads);
+    return newEventWithTimeout(eventType, 0);
+  }
+
+  private TxEvent newEventWithTimeout(EventType eventType, int timeout) {
+    return newEventWithTimeout(eventType, localTxId, timeout);
+  }
+
+  private TxEvent newEventWithTimeout(EventType eventType, String localTxId, 
int timeout) {
+    return new TxEvent(serviceName, instanceId, globalTxId, localTxId, 
parentTxId, eventType.name(), compensationMethod, timeout, payloads);
   }
 
   private TxEvent eventOf(EventType eventType, String localTxId) {
-    return new TxEvent(serviceName, instanceId, new Date(),
+    return new TxEvent(serviceName,
+        instanceId,
         globalTxId,
         localTxId,
         UUID.randomUUID().toString(),
         eventType.name(),
         compensationMethod,
-        null,
         payloads);
   }
 }
diff --git 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java
 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java
index 4b65528..c14ffd9 100644
--- 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java
+++ 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java
@@ -20,7 +20,6 @@ package org.apache.servicecomb.saga.alpha.core;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
-import java.util.Date;
 import java.util.UUID;
 
 class TxEventMaker {
@@ -28,13 +27,11 @@ class TxEventMaker {
     return new TxEvent(
         uniquify("serviceName"),
         uniquify("instanceId"),
-        new Date(),
         uniquify("globalTxId"),
         uniquify("localTxId"),
         UUID.randomUUID().toString(),
         TxStartedEvent.name(),
         TxEventMaker.class.getCanonicalName(),
-        null,
         uniquify("blah").getBytes());
   }
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index a431437..9472d0d 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -35,6 +35,7 @@ import 
org.apache.servicecomb.saga.alpha.core.PendingTaskRunner;
 import org.apache.servicecomb.saga.alpha.core.PushBackOmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
+import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.domain.EntityScan;
 import org.springframework.context.annotation.Bean;
@@ -70,6 +71,11 @@ class AlphaConfig {
   }
 
   @Bean
+  TxTimeoutRepository springTxTimeoutRepository(TxTimeoutEntityRepository 
timeoutRepo) {
+    return new SpringTxTimeoutRepository(timeoutRepo);
+  }
+
+  @Bean
   ScheduledExecutorService compensationScheduler() {
     return scheduler;
   }
@@ -80,16 +86,15 @@ class AlphaConfig {
       ScheduledExecutorService scheduler,
       TxEventRepository eventRepository,
       CommandRepository commandRepository,
+      TxTimeoutRepository timeoutRepository,
       OmegaCallback omegaCallback,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
 
     new EventScanner(scheduler,
-        eventRepository,
-        commandRepository,
-        omegaCallback,
-        eventPollingInterval).run();
+        eventRepository, commandRepository, timeoutRepository,
+        omegaCallback, eventPollingInterval).run();
 
-    TxConsistentService consistentService = new 
TxConsistentService(eventRepository);
+    TxConsistentService consistentService = new 
TxConsistentService(eventRepository, timeoutRepository);
 
     ServerStartable startable = buildGrpc(port, consistentService, 
omegaCallbacks);
     new Thread(startable::start).start();
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 679b6ba..ee7e2e4 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -75,18 +75,16 @@ class GrpcTxEventEndpointImpl extends 
TxEventServiceImplBase {
 
   @Override
   public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> 
responseObserver) {
-    Date date = new Date(message.getTimestamp());
-    int timeout = message.getTimeout();
     boolean ok = txConsistentService.handle(new TxEvent(
         message.getServiceName(),
         message.getInstanceId(),
-        date,
+        new Date(message.getTimestamp()),
         message.getGlobalTxId(),
         message.getLocalTxId(),
         message.getParentTxId().isEmpty() ? null : message.getParentTxId(),
         message.getType(),
         message.getCompensationMethod(),
-        timeout == 0 ? null : new Date(date.getTime() + timeout),
+        message.getTimeout(),
         message.getPayloads().toByteArray()
     ));
 
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index afbdaf5..086f88e 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -17,9 +17,9 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
-import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
-import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
-import static org.apache.servicecomb.saga.alpha.core.CommandStatus.PENDING;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.PENDING;
 
 import java.lang.invoke.MethodHandles;
 import java.util.LinkedHashMap;
@@ -33,11 +33,9 @@ import 
org.apache.servicecomb.saga.alpha.core.CommandRepository;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.data.domain.PageRequest;
 
 public class SpringCommandRepository implements CommandRepository {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private static final PageRequest SINGLE_COMMAND_REQUEST = new PageRequest(0, 
1);
 
   private final TxEventEnvelopeRepository eventRepository;
   private final CommandEntityRepository commandRepository;
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index c3e8f04..5531d8f 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -25,6 +25,7 @@ import 
org.apache.servicecomb.saga.alpha.core.TxEventRepository;
 import org.springframework.data.domain.PageRequest;
 
 class SpringTxEventRepository implements TxEventRepository {
+  private static final PageRequest SINGLE_TX_EVENT_REQUEST = new 
PageRequest(0, 1);
   private final TxEventEnvelopeRepository eventRepo;
 
   SpringTxEventRepository(TxEventEnvelopeRepository eventRepo) {
@@ -48,7 +49,7 @@ class SpringTxEventRepository implements TxEventRepository {
 
   @Override
   public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, 
String type) {
-    return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, new 
PageRequest(0, 1));
+    return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, 
SINGLE_TX_EVENT_REQUEST);
   }
 
   @Override
@@ -57,12 +58,7 @@ class SpringTxEventRepository implements TxEventRepository {
   }
 
   @Override
-  public Optional<TxEvent> findFirstTimeoutEventByIdGreaterThan(long id) {
-    return eventRepo.findFirstTimeoutSurrogateIdGreaterThan(id);
-  }
-
-  @Override
-  public void deleteDuplicateEvents(List<String> types) {
-    eventRepo.deleteByTypes(types);
+  public void deleteDuplicateEvents(String type) {
+    eventRepo.deleteByType(type);
   }
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
new file mode 100644
index 0000000..71c808d
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.PENDING;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.transaction.Transactional;
+
+import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.alpha.core.TxTimeout;
+import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
+import org.springframework.data.domain.PageRequest;
+
+public class SpringTxTimeoutRepository implements TxTimeoutRepository {
+  private final TxTimeoutEntityRepository timeoutRepo;
+
+  SpringTxTimeoutRepository(TxTimeoutEntityRepository timeoutRepo) {
+    this.timeoutRepo = timeoutRepo;
+  }
+
+  @Override
+  public void save(TxTimeout event) {
+    timeoutRepo.save(event);
+  }
+
+  @Override
+  public void markTxTimeoutAsDone(String globalTxId, String localTxId) {
+    timeoutRepo.updateStatusByGlobalTxIdAndLocalTxId(DONE.name(), globalTxId, 
localTxId);
+  }
+
+  @Transactional
+  @Override
+  public List<TxEvent> findFirstTimeoutTxToAbort() {
+    List<TxEvent> timeoutEvents = 
timeoutRepo.findFirstTimeoutTxOrderByExpireTimeAsc(new PageRequest(0, 1));
+    List<TxEvent> pendingTimeoutEvents = new ArrayList<>();
+    timeoutEvents.forEach(event -> {
+      if 
(timeoutRepo.updateStatusFromNewByGlobalTxIdAndLocalTxId(PENDING.name(), 
event.globalTxId(), event.localTxId())
+          != 0) {
+        pendingTimeoutEvents.add(event);
+      }
+    });
+    return pendingTimeoutEvents;
+  }
+}
\ No newline at end of file
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index e46b264..c4984f9 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -40,7 +40,7 @@ interface TxEventEnvelopeRepository extends 
CrudRepository<TxEvent, Long> {
 
   @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
       + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, 
"
-      + "t.type, t.compensationMethod, t.expireTime, t.payloads"
+      + "t.type, t.compensationMethod, t.payloads"
       + ") FROM TxEvent t "
       + "WHERE t.globalTxId = ?1 AND t.type = ?2")
   List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String 
type);
@@ -78,26 +78,13 @@ interface TxEventEnvelopeRepository extends 
CrudRepository<TxEvent, Long> {
 
   Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long 
surrogateId);
 
-  @Query("SELECT t FROM TxEvent t "
-      + "WHERE t.type IN ('TxStartedEvent', 'SagaStartedEvent') "
-      + "  AND t.expireTime IS NOT NULL "
-      + "  AND t.expireTime < CURRENT_TIMESTAMP "
-      + "  AND t.surrogateId > ?1 AND NOT EXISTS ("
-      + "  SELECT t1.globalTxId"
-      + "  FROM TxEvent t1 "
-      + "  WHERE t1.globalTxId = t.globalTxId "
-      + "  AND t1.localTxId = t.localTxId "
-      + "  AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent')) "
-      + "ORDER BY t.surrogateId ASC")
-  Optional<TxEvent> findFirstTimeoutSurrogateIdGreaterThan(long surrogateId);
-
   @Transactional
   @Modifying(clearAutomatically = true)
   @Query("DELETE FROM TxEvent t "
-      + "WHERE t.type IN ?1 AND t.surrogateId NOT IN ("
+      + "WHERE t.type = ?1 AND t.surrogateId NOT IN ("
       + " SELECT MAX(t1.surrogateId) FROM TxEvent t1 "
-      + " WHERE t1.type = t.type"
+      + " WHERE t1.type = ?1 "
       + " GROUP BY t1.globalTxId"
       + ")")
-  void deleteByTypes(List<String> types);
+  void deleteByType(String type);
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java
new file mode 100644
index 0000000..cc39397
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import java.util.List;
+
+import javax.persistence.LockModeType;
+import javax.transaction.Transactional;
+
+import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.alpha.core.TxTimeout;
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Lock;
+import org.springframework.data.jpa.repository.Modifying;
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
+
+interface TxTimeoutEntityRepository extends CrudRepository<TxTimeout, Long> {
+
+  @Transactional
+  @Modifying(clearAutomatically = true)
+  @Query("UPDATE org.apache.servicecomb.saga.alpha.core.TxTimeout t "
+      + "SET t.status = :status "
+      + "WHERE t.globalTxId = :globalTxId "
+      + "  AND t.localTxId = :localTxId")
+  void updateStatusByGlobalTxIdAndLocalTxId(
+      @Param("status") String status,
+      @Param("globalTxId") String globalTxId,
+      @Param("localTxId") String localTxId);
+
+  @Transactional
+  @Modifying(clearAutomatically = true)
+  @Query("UPDATE org.apache.servicecomb.saga.alpha.core.TxTimeout t "
+      + "SET t.status = :status "
+      + "WHERE t.globalTxId = :globalTxId "
+      + "  AND t.localTxId = :localTxId "
+      + "  AND t.status = 'NEW'")
+  int updateStatusFromNewByGlobalTxIdAndLocalTxId(
+      @Param("status") String status,
+      @Param("globalTxId") String globalTxId,
+      @Param("localTxId") String localTxId);
+
+  @Lock(LockModeType.OPTIMISTIC)
+  @Query("SELECT te FROM TxEvent AS te "
+      + "INNER JOIN TxTimeout AS tt "
+      + "ON te.globalTxId = tt.globalTxId "
+      + "  AND te.localTxId = tt.localTxId "
+      + "  AND tt.status = 'NEW' "
+      + "  AND tt.expireTime < CURRENT_TIMESTAMP "
+      + "ORDER BY tt.expireTime ASC")
+  List<TxEvent> findFirstTimeoutTxOrderByExpireTimeAsc(Pageable pageable);
+}
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql 
b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index a3aee1c..484c5e3 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -3,13 +3,13 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   serviceName varchar(16) NOT NULL,
   instanceId varchar(36) NOT NULL,
   creationTime timestamp(6) NOT NULL DEFAULT CURRENT_DATE,
-  expireTime timestamp(6) NULL,
   globalTxId varchar(36) NOT NULL,
   localTxId varchar(36) NOT NULL,
   parentTxId varchar(36) DEFAULT NULL,
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
-  payloads bytea
+  payloads bytea,
+  version bigint NOT NULL
 );
 
 CREATE INDEX IF NOT EXISTS saga_events_index ON TxEvent (surrogateId, 
globalTxId, localTxId, type);
@@ -31,3 +31,15 @@ CREATE TABLE IF NOT EXISTS Command (
 );
 
 CREATE INDEX IF NOT EXISTS saga_commands_index ON Command (surrogateId, 
eventId, globalTxId, localTxId, status);
+
+
+CREATE TABLE IF NOT EXISTS TxTimeout (
+  surrogateId BIGSERIAL PRIMARY KEY,
+  globalTxId varchar(36) NOT NULL,
+  localTxId varchar(36) NOT NULL,
+  expireTime TIMESTAMP NOT NULL,
+  status varchar(12),
+  version bigint NOT NULL
+);
+
+CREATE INDEX IF NOT EXISTS saga_timeouts_index ON TxTimeout (surrogateId, 
expireTime, globalTxId, localTxId, status);
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 585e755..0073648 100644
--- 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -19,6 +19,8 @@ package org.apache.servicecomb.saga.alpha.server;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
@@ -31,7 +33,6 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -48,6 +49,8 @@ import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
+import org.apache.servicecomb.saga.alpha.core.TxTimeout;
+import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
@@ -108,12 +111,18 @@ public class AlphaIntegrationTest {
   private CommandRepository commandRepository;
 
   @Autowired
-  private CommandEntityRepository commandEntityRepository;
+  private TxTimeoutRepository timeoutRepository;
+
+  @Autowired
+  private TxTimeoutEntityRepository timeoutEntityRepository;
 
   @Autowired
   private OmegaCallback omegaCallback;
 
   @Autowired
+  private CommandEntityRepository commandEntityRepository;
+
+  @Autowired
   private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
   @Autowired
@@ -145,6 +154,7 @@ public class AlphaIntegrationTest {
       try {
         eventRepo.deleteAll();
         commandEntityRepository.deleteAll();
+        timeoutEntityRepository.deleteAll();
         deleted = true;
       } catch (Exception ignored) {
       }
@@ -373,12 +383,22 @@ public class AlphaIntegrationTest {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, 
globalTxId, null, 1));
 
-    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 3);
+    assertThat(timeoutEntityRepository.count(), is(1L));
+    TxTimeout timeout = timeoutEntityRepository.findOne(1L);
+    assertThat(timeout.status(), is(NEW.name()));
+
+    await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3);
 
     List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
     assertThat(events.get(0).type(), is(SagaStartedEvent.name()));
     assertThat(events.get(1).type(), is(TxAbortedEvent.name()));
     assertThat(events.get(2).type(), is(SagaEndedEvent.name()));
+
+    assertThat(timeoutEntityRepository.count(), is(1L));
+    timeout = timeoutEntityRepository.findOne(1L);
+    assertThat(timeout.status(), is(DONE.name()));
+    assertThat(timeout.globalTxId(), is(globalTxId));
+    assertThat(timeout.localTxId(), is(globalTxId));
   }
 
   @Test
@@ -387,13 +407,19 @@ public class AlphaIntegrationTest {
     blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, 
globalTxId));
     blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, 
globalTxId, 1));
 
-    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4);
+    await().atMost(2, SECONDS).until(() -> eventRepo.count() == 4);
 
     List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
     assertThat(events.get(0).type(), is(SagaStartedEvent.name()));
     assertThat(events.get(1).type(), is(TxStartedEvent.name()));
     assertThat(events.get(2).type(), is(TxAbortedEvent.name()));
     assertThat(events.get(3).type(), is(SagaEndedEvent.name()));
+
+    assertThat(timeoutEntityRepository.count(), is(1L));
+    TxTimeout timeout = timeoutEntityRepository.findOne(1L);
+    assertThat(timeout.status(), is(DONE.name()));
+    assertThat(timeout.globalTxId(), is(globalTxId));
+    assertThat(timeout.localTxId(), is(localTxId));
   }
 
   private GrpcAck onCompensation(GrpcCompensateCommand command) {
@@ -416,13 +442,11 @@ public class AlphaIntegrationTest {
     return new TxEvent(
         serviceName,
         instanceId,
-        new Date(),
         globalTxId,
         localTxId,
         parentTxId,
         TxAbortedEvent.name(),
         compensationMethod,
-        null,
         payload.getBytes());
   }
 
@@ -508,7 +532,7 @@ public class AlphaIntegrationTest {
         Executors.newSingleThreadScheduledExecutor(),
         eventRepository,
         commandRepository,
-        omegaCallback,
-        1).run();
+        timeoutRepository,
+        omegaCallback, 1).run();
   }
 }
diff --git a/alpha/alpha-server/src/test/resources/schema.sql 
b/alpha/alpha-server/src/test/resources/schema.sql
index 53bcd1e..0958389 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -3,14 +3,13 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   serviceName varchar(36) NOT NULL,
   instanceId varchar(36) NOT NULL,
   creationTime TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
-  expireTime TIMESTAMP NULL,
   globalTxId varchar(36) NOT NULL,
   localTxId varchar(36) NOT NULL,
   parentTxId varchar(36) DEFAULT NULL,
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
   payloads varbinary(10240),
---  version bigint NOT NULL
+  version bigint NOT NULL
 );
 
 CREATE TABLE IF NOT EXISTS Command (
@@ -27,3 +26,12 @@ CREATE TABLE IF NOT EXISTS Command (
   lastModified TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
   version bigint NOT NULL
 );
+
+CREATE TABLE IF NOT EXISTS TxTimeout (
+  surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT 
BY 1) PRIMARY KEY,
+  globalTxId varchar(36) NOT NULL,
+  localTxId varchar(36) NOT NULL,
+  expireTime TIMESTAMP NOT NULL,
+  status varchar(12),
+  version bigint NOT NULL
+);
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index 7328fef..388f237 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -47,14 +47,13 @@ public class SagaStartAspect {
     initializeOmegaContext();
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
 
-    OnceAwareInterceptor interceptor = new 
OnceAwareInterceptor(sagaStartAnnotationProcessor);
-    interceptor.preIntercept(context.globalTxId(), method.toString(), 
sagaStart.timeout());
+    sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), 
method.toString(), sagaStart.timeout());
     LOG.debug("Initialized context {} before execution of method {}", context, 
method.toString());
 
     try {
       Object result = joinPoint.proceed();
 
-      interceptor.postIntercept(context.globalTxId(), method.toString());
+      sagaStartAnnotationProcessor.postIntercept(context.globalTxId(), 
method.toString());
       LOG.debug("Transaction with context {} has finished.", context);
 
       return result;
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java
index 0a87491..afdb958 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java
@@ -39,7 +39,6 @@ public class OnceAwareInterceptorTest {
 
   private final AtomicInteger postInterceptInvoked = new AtomicInteger();
   private final AtomicInteger onErrorInvoked = new AtomicInteger();
-  private final AtomicInteger onTimeoutInvoked = new AtomicInteger();
 
   private final EventAwareInterceptor underlying = new EventAwareInterceptor() 
{
     @Override
@@ -54,11 +53,7 @@ public class OnceAwareInterceptorTest {
 
     @Override
     public void onError(String parentTxId, String compensationMethod, 
Throwable throwable) {
-      if (throwable instanceof OmegaTxTimeoutException) {
-        onTimeoutInvoked.incrementAndGet();
-      } else {
-        onErrorInvoked.incrementAndGet();
-      }
+      onErrorInvoked.incrementAndGet();
     }
   };
 

-- 
To stop receiving notification emails like this one, please contact
ningji...@apache.org.

Reply via email to