[ 
https://issues.apache.org/jira/browse/SCB-915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641270#comment-16641270
 ] 

ASF GitHub Bot commented on SCB-915:
------------------------------------

oliugian closed pull request #313: SCB-915:saga alpha event scanner optimization
URL: https://github.com/apache/incubator-servicecomb-saga/pull/313
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 2bbea774..6e50619b 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -21,11 +21,15 @@
 
 public interface CommandRepository {
 
-  void saveCompensationCommands(String globalTxId);
+  void saveCompensationCommands(String globalTxId, String localTxId);
 
   void markCommandAsDone(String globalTxId, String localTxId);
 
+  void markCommandAsPending(String globalTxId, String localTxId);
+
   List<Command> findUncompletedCommands(String globalTxId);
 
-  List<Command> findFirstCommandToCompensate();
+  List<Command> findAllCommandsToCompensate();
+
+  List<Command> findPendingCommands();
 }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackRunner.java
similarity index 50%
rename from 
alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
rename to 
alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackRunner.java
index 54e78f7b..058e570c 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackRunner.java
@@ -20,22 +20,35 @@
 import static java.util.Collections.emptyMap;
 
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class CompositeOmegaCallback implements OmegaCallback {
+public class CompositeOmegaCallbackRunner implements OmegaCallback, 
Callable<List<TxEvent>> {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final Map<String, Map<String, OmegaCallback>> callbacks;
+  private final List<TxEvent> txEvents;
 
-  public CompositeOmegaCallback(Map<String, Map<String, OmegaCallback>> 
callbacks) {
+  public CompositeOmegaCallbackRunner(Map<String, Map<String, OmegaCallback>> 
callbacks,
+      List<TxEvent> txEvents) {
     this.callbacks = callbacks;
+    this.txEvents = txEvents;
+  }
+
+  @Override
+  public List<TxEvent> call() {
+    return compensateAllEvents(txEvents);
   }
 
   @Override
   public void compensate(TxEvent event) {
-    Map<String, OmegaCallback> serviceCallbacks = 
callbacks.getOrDefault(event.serviceName(), emptyMap());
+    Map<String, OmegaCallback> serviceCallbacks = callbacks
+        .getOrDefault(event.serviceName(), emptyMap());
 
     if (serviceCallbacks.isEmpty()) {
       throw new AlphaException("No such omega callback found for service " + 
event.serviceName());
@@ -43,7 +56,8 @@ public void compensate(TxEvent event) {
 
     OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
     if (omegaCallback == null) {
-      LOG.info("Cannot find the service with the instanceId {}, call the other 
instance.", event.instanceId());
+      LOG.info("Cannot find the service with the instanceId {}, call the other 
instance.",
+          event.instanceId());
       omegaCallback = serviceCallbacks.values().iterator().next();
     }
 
@@ -54,4 +68,37 @@ public void compensate(TxEvent event) {
       throw e;
     }
   }
+
+  @Override
+  public List<TxEvent> compensateAllEvents(List<TxEvent> txEvents) {
+    List<TxEvent> resultTxEvents = new ArrayList<>();
+    for (TxEvent txEvent : txEvents) {
+      try {
+        LOG.info("compensating event with globalTxId: {} localTxId: {}", 
txEvent.globalTxId(),
+            txEvent.localTxId());
+        this.compensate(txEvent);
+        resultTxEvents.add(txEvent);
+      } catch (AlphaException ae) {
+        LOG.error("compensate event with globalTxId: {} localTxId: {} 
failed,error message is {}",
+            txEvent.globalTxId(), txEvent.localTxId(), ae);
+        break;
+      } catch (Exception e) {
+        logError(txEvent, e);
+      }
+    }
+
+    return resultTxEvents;
+  }
+
+  private void logError(TxEvent event, Exception e) {
+    LOG.error(
+        "Failed to {} service [{}] instance [{}] with method [{}], global tx 
id [{}] and local tx id [{}]",
+        event.retries() == 0 ? "compensate" : "retry",
+        event.serviceName(),
+        event.instanceId(),
+        event.retries() == 0 ? event.compensationMethod() : 
event.retryMethod(),
+        event.globalTxId(),
+        event.localTxId(),
+        e);
+  }
 }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 0a15ad0e..1ef4176f 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -19,12 +19,14 @@
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.PENDING;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -36,6 +38,7 @@
 
 @EnableKamon
 public class EventScanner implements Runnable {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private static final byte[] EMPTY_PAYLOAD = new byte[0];
@@ -52,9 +55,6 @@
 
   private final int eventPollingInterval;
 
-  private long nextEndedEventId;
-
-  private long nextCompensatedEventId;
 
   public EventScanner(ScheduledExecutorService scheduler,
       TxEventRepository eventRepository,
@@ -79,21 +79,26 @@ private void pollEvents() {
     scheduler.scheduleWithFixedDelay(
         () -> {
           updateTimeoutStatus();
-          findTimeoutEvents();
+          findAllTimeoutEvents();
           abortTimeoutEvents();
           saveUncompensatedEventsToCommands();
           compensate();
           updateCompensatedCommands();
+          markSagaEndedForNoTxEnd();
           deleteDuplicateSagaEndedEvents();
-          updateTransactionStatus();
+          dumpColdData();
         },
         0,
         eventPollingInterval,
         MILLISECONDS);
   }
 
-  @Trace("findTimeoutEvents")
-  private void findTimeoutEvents() {
+  private void updateTimeoutStatus() {
+    timeoutRepository.markTimeoutAsDone();
+  }
+
+  @Trace("findAllTimeoutEvents")
+  private void findAllTimeoutEvents() {
     eventRepository.findTimeoutEvents()
         .forEach(event -> {
           LOG.info("Found timeout event {}", event);
@@ -101,36 +106,38 @@ private void findTimeoutEvents() {
         });
   }
 
-  private void updateTimeoutStatus() {
-    timeoutRepository.markTimeoutAsDone();
+  @Trace("abortTimeoutEvents")
+  private void abortTimeoutEvents() {
+    timeoutRepository.findTimeouts().forEach(timeout -> {
+      LOG.info("Found timeout event {} to abort", timeout);
+      eventRepository.save(toTxAbortedEvent(timeout));
+    });
   }
 
   @Trace("saveUncompensatedEventsToCommands")
   private void saveUncompensatedEventsToCommands() {
-    
eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, 
TxEndedEvent.name())
+    eventRepository.findNeedToCompensateTxs()
         .forEach(event -> {
           LOG.info("Found uncompensated event {}", event);
-          nextEndedEventId = event.id();
-          commandRepository.saveCompensationCommands(event.globalTxId());
+          commandRepository.saveCompensationCommands(event.globalTxId(), 
event.localTxId());
         });
   }
 
-  @Trace("updateCompensationStatus")
-  private void updateCompensatedCommands() {
-    
eventRepository.findFirstCompensatedEventByIdGreaterThan(nextCompensatedEventId)
-        .ifPresent(event -> {
-          LOG.info("Found compensated event {}", event);
-          nextCompensatedEventId = event.id();
-          updateCompensationStatus(event);
-        });
+  @Trace("compensate")
+  private void compensate() {
+    List<TxEvent> compensateTxEvents = new ArrayList<>();
+    commandRepository.findAllCommandsToCompensate()
+        .forEach(command ->
+            compensateTxEvents.add(txStartedEventOf(command))
+        );
+    omegaCallback.compensateAllEvents(compensateTxEvents).forEach(
+        event -> commandRepository.markCommandAsPending(event.globalTxId(), 
event.localTxId()));
   }
 
-  @Trace("deleteDuplicateSagaEndedEvents")
-  private void deleteDuplicateSagaEndedEvents() {
-    try {
-      eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
-    } catch (Exception e) {
-      LOG.warn("Failed to delete duplicate event", e);
+  private void markSagaEnded(TxEvent event) {
+    if 
(commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
+      LOG.info("Marked end of transaction with globalTxId {}", 
event.globalTxId());
+      markGlobalTxEndWithEvent(event);
     }
   }
 
@@ -139,43 +146,45 @@ private void updateCompensationStatus(TxEvent event) {
     LOG.info("Transaction with globalTxId {} and localTxId {} was compensated",
         event.globalTxId(),
         event.localTxId());
-
     markSagaEnded(event);
   }
 
-  @Trace("abortTimeoutEvents")
-  private void abortTimeoutEvents() {
-    timeoutRepository.findFirstTimeout().forEach(timeout -> {
-      LOG.info("Found timeout event {} to abort", timeout);
-
-      eventRepository.save(toTxAbortedEvent(timeout));
+  @Trace("updateCompensatedCommands")
+  private void updateCompensatedCommands() {
+    commandRepository.findPendingCommands().forEach(command ->
+        eventRepository.findCompensatedDoneTxs(command.globalTxId(), 
command.localTxId())
+            .forEach(event ->
+            {
+              LOG.info("Found compensated event {}", event);
+              updateCompensationStatus(event);
+            }));
+  }
 
-      if (timeout.type().equals(TxStartedEvent.name())) {
-        eventRepository.findTxStartedEvent(timeout.globalTxId(), 
timeout.localTxId())
-            .ifPresent(omegaCallback::compensate);
-      }
-    });
+  private void markGlobalTxEndWithEvent(TxEvent event) {
+    eventRepository.save(toSagaEndedEvent(event));
   }
 
-  @Trace("updateTransactionStatus")
-  private void updateTransactionStatus() {
-    
eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEndWithEvents);
+  private void markSagaEndedForNoTxEnd() {
+    eventRepository.findAllFinishedTxsForNoTxEnd().forEach(
+        event -> {
+          LOG.info("Marked end of no tx end's transaction with globalTxId {}", 
event.globalTxId());
+          markGlobalTxEndWithEvent(event);
+        });
   }
 
-  private void markSagaEnded(TxEvent event) {
-    if 
(commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
-      markGlobalTxEndWithEvent(event);
+  @Trace("deleteDuplicateSagaEndedEvents")
+  private void deleteDuplicateSagaEndedEvents() {
+    try {
+      eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
+    } catch (Exception e) {
+      LOG.warn("Failed to delete duplicate event", e);
     }
   }
 
-  private void markGlobalTxEndWithEvent(TxEvent event) {
-    eventRepository.save(toSagaEndedEvent(event));
-    LOG.info("Marked end of transaction with globalTxId {}", 
event.globalTxId());
+  private void dumpColdData() {
+    eventRepository.dumpColdEventData();
   }
 
-  private void markGlobalTxEndWithEvents(List<TxEvent> events) {
-    events.forEach(this::markGlobalTxEndWithEvent);
-  }
 
   private TxEvent toTxAbortedEvent(TxTimeout timeout) {
     return new TxEvent(
@@ -201,17 +210,6 @@ private TxEvent toSagaEndedEvent(TxEvent event) {
         EMPTY_PAYLOAD);
   }
 
-  @Trace("compensate")
-  private void compensate() {
-    commandRepository.findFirstCommandToCompensate()
-        .forEach(command -> {
-          LOG.info("Compensating transaction with globalTxId {} and localTxId 
{}",
-              command.globalTxId(),
-              command.localTxId());
-
-          omegaCallback.compensate(txStartedEventOf(command));
-        });
-  }
 
   private TxEvent txStartedEventOf(Command command) {
     return new TxEvent(
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
index f60a44dd..41e0b143 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
@@ -17,9 +17,14 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import java.util.List;
+
 public interface OmegaCallback {
+
   void compensate(TxEvent event);
 
+  List<TxEvent> compensateAllEvents(List<TxEvent> txEvents);
+
   default void disconnect() {
   }
 }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
index 9556d7ca..44372c90 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
@@ -18,41 +18,56 @@
 package org.apache.servicecomb.saga.alpha.core;
 
 import java.lang.invoke.MethodHandles;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.Collections.emptyMap;
+
 public class PushBackOmegaCallback implements OmegaCallback {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final BlockingQueue<Runnable> pendingCompensations;
-  private final OmegaCallback underlying;
+  private final Map<String, Map<String, OmegaCallback>> callbacks;
+  private final ExecutorService compensateExecutor;
 
-  public PushBackOmegaCallback(BlockingQueue<Runnable> pendingCompensations, 
OmegaCallback underlying) {
-    this.pendingCompensations = pendingCompensations;
-    this.underlying = underlying;
+  public PushBackOmegaCallback(Map<String, Map<String, OmegaCallback>> 
callbacks,
+      ExecutorService compensateExecutor) {
+    this.callbacks = callbacks;
+    this.compensateExecutor = compensateExecutor;
   }
 
   @Override
-  public void compensate(TxEvent event) {
-    try {
-      underlying.compensate(event);
-    } catch (Exception e) {
-      logError(event, e);
-      pendingCompensations.offer(() -> compensate(event));
-    }
+  public List<TxEvent> compensateAllEvents(List<TxEvent> txEvents) {
+    List<Future<List<TxEvent>>> futures = new ArrayList<>();
+    List<TxEvent> result = new ArrayList<>();
+    Set<String> services = new HashSet<>();
+    txEvents.stream()
+        .filter(txEvent -> !callbacks.getOrDefault(txEvent.serviceName(), 
emptyMap()).isEmpty())
+        .forEach(event -> services.add(event.serviceName()));
+    services.forEach(service -> futures.add(compensateExecutor.submit(
+        new CompositeOmegaCallbackRunner(callbacks,
+            txEvents.stream().filter(txEvent -> 
txEvent.serviceName().equals(service))
+                .collect(Collectors.toList())))));
+    futures.forEach(f -> {
+      try {
+        result.addAll(f.get());
+      } catch (Exception e) {
+        LOG.error("Run compensate thread failed. Error message is {}.", e);
+      }
+    });
+    return result;
   }
 
-  private void logError(TxEvent event, Exception e) {
-    LOG.error(
-        "Failed to {} service [{}] instance [{}] with method [{}], global tx 
id [{}] and local tx id [{}]",
-        event.retries() == 0 ? "compensate" : "retry",
-        event.serviceName(),
-        event.instanceId(),
-        event.retries() == 0 ? event.compensationMethod() : 
event.retryMethod(),
-        event.globalTxId(),
-        event.localTxId(),
-        e);
+  @Override
+  public void compensate(TxEvent event) {
+
   }
+
+
 }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventHistory.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventHistory.java
new file mode 100644
index 00000000..31bfbc9c
--- /dev/null
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventHistory.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.core;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.Date;
+
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Transient;
+
+@Entity
+@Table(name = "TxEventHistory")
+public class TxEventHistory {
+
+  @Transient
+  public static final long MAX_TIMESTAMP = 253402214400000L; // 9999-12-31 
00:00:00
+
+  @Id
+  @GeneratedValue(strategy = GenerationType.IDENTITY)
+  private Long surrogateId;
+
+  private String serviceName;
+  private String instanceId;
+  private Date creationTime;
+  private String globalTxId;
+  private String localTxId;
+  private String parentTxId;
+  private String type;
+  private String compensationMethod;
+  private Date expiryTime;
+  private String retryMethod;
+  private int retries;
+  private byte[] payloads;
+
+  private TxEventHistory() {
+  }
+
+  public TxEventHistory(TxEventHistory event) {
+    this(event.surrogateId,
+        event.serviceName,
+        event.instanceId,
+        event.creationTime,
+        event.globalTxId,
+        event.localTxId,
+        event.parentTxId,
+        event.type,
+        event.compensationMethod,
+        event.expiryTime,
+        event.retryMethod,
+        event.retries,
+        event.payloads);
+  }
+
+
+  TxEventHistory(Long surrogateId,
+      String serviceName,
+      String instanceId,
+      Date creationTime,
+      String globalTxId,
+      String localTxId,
+      String parentTxId,
+      String type,
+      String compensationMethod,
+      Date expiryTime,
+      String retryMethod,
+      int retries,
+      byte[] payloads) {
+    this.surrogateId = surrogateId;
+    this.serviceName = serviceName;
+    this.instanceId = instanceId;
+    this.creationTime = creationTime;
+    this.globalTxId = globalTxId;
+    this.localTxId = localTxId;
+    this.parentTxId = parentTxId;
+    this.type = type;
+    this.compensationMethod = compensationMethod;
+    this.expiryTime = expiryTime;
+    this.retryMethod = retryMethod;
+    this.retries = retries;
+    this.payloads = payloads;
+  }
+
+  public String serviceName() {
+    return serviceName;
+  }
+
+  public String instanceId() {
+    return instanceId;
+  }
+
+  public Date creationTime() {
+    return creationTime;
+  }
+
+  public String globalTxId() {
+    return globalTxId;
+  }
+
+  public String localTxId() {
+    return localTxId;
+  }
+
+  public String parentTxId() {
+    return parentTxId;
+  }
+
+  public String type() {
+    return type;
+  }
+
+  public String compensationMethod() {
+    return compensationMethod;
+  }
+
+  public byte[] payloads() {
+    return payloads;
+  }
+
+  public long id() {
+    return surrogateId;
+  }
+
+  public Date expiryTime() {
+    return expiryTime;
+  }
+
+  public String retryMethod() {
+    return retryMethod;
+  }
+
+  public int retries() {
+    return retries;
+  }
+
+  @Override
+  public String toString() {
+    return "TxEventHistory{" +
+        "surrogateId=" + surrogateId +
+        ", serviceName='" + serviceName + '\'' +
+        ", instanceId='" + instanceId + '\'' +
+        ", creationTime=" + creationTime +
+        ", globalTxId='" + globalTxId + '\'' +
+        ", localTxId='" + localTxId + '\'' +
+        ", parentTxId='" + parentTxId + '\'' +
+        ", type='" + type + '\'' +
+        ", compensationMethod='" + compensationMethod + '\'' +
+        ", expiryTime=" + expiryTime +
+        ", retryMethod='" + retryMethod + '\'' +
+        ", retries=" + retries +
+        '}';
+  }
+}
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index f2cccca7..1c38c5a4 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -33,16 +33,16 @@
    */
   void save(TxEvent event);
 
-  /**
-   * Find a {@link TxEvent} which satisfies below requirements:
-   *
-   * <ol>
-   *   <li>{@link TxEvent#type} is {@link EventType#TxAbortedEvent}</li>
-   *   <li>There are no {@link TxEvent} which has the same {@link 
TxEvent#globalTxId} and {@link TxEvent#type} is {@link EventType#TxEndedEvent} 
or {@link EventType#SagaEndedEvent}</li>
-   * </ol>
-   * @return
-   */
-  Optional<List<TxEvent>> findFirstAbortedGlobalTransaction();
+/**
+ * Find a {@link TxEvent} which satisfies below requirements:
+ *
+ * <ol>
+ *   <li>{@link TxEvent#type} is {@link EventType#TxAbortedEvent}</li>
+ *   <li>There are no {@link TxEvent} which has the same {@link 
TxEvent#globalTxId} and {@link TxEvent#type} is {@link EventType#TxEndedEvent} 
or {@link EventType#SagaEndedEvent}</li>
+ * </ol>
+ * @return
+ */
+Optional<List<TxEvent>> findFirstAbortedGlobalTransaction();
 
   /**
    * Find timeout {@link TxEvent}s. A timeout TxEvent satisfies below 
requirements:
@@ -71,51 +71,70 @@
    */
   Optional<TxEvent> findTxStartedEvent(String globalTxId, String localTxId);
 
-  /**
-   * Find {@link TxEvent}s which satisfy below requirements:
-   * <ol>
-   *   <li>{@link TxEvent#globalTxId} equals to param 
<code>globalTxId</code></li>
-   *   <li>{@link TxEvent#type} equals to param <code>type</code></li>
-   * </ol>
-   *
-   * @param globalTxId globalTxId to search for
-   * @param type       event type to search for
-   * @return
-   */
-  List<TxEvent> findTransactions(String globalTxId, String type);
+/**
+ * Find {@link TxEvent}s which satisfy below requirements:
+ * <ol>
+ *   <li>{@link TxEvent#globalTxId} equals to param 
<code>globalTxId</code></li>
+ *   <li>{@link TxEvent#type} equals to param <code>type</code></li>
+ * </ol>
+ *
+ * @param globalTxId globalTxId to search for
+ * @param type       event type to search for
+ * @return
+ */
+List<TxEvent> findTransactions(String globalTxId, String type);
+
+/**
+ * Find timeout {@link TxEvent}s. A TxEvent satisfies below requirements:
+ *
+ * <ol>
+ *  <li>{@link TxEvent#type} is the lasted event {@link TxEvent} which type is 
<code>TxEndedEvent</code></li>
+ *  <li>There are no unfinished event {@link TxEvent} which type is 
<code>TxStartedEvent</code></li>
+ *  <li>There are no corresponding {@link TxEvent} which type is 
<code>TxCompensatedEvent</code> </li>
+ *  <li>There are no corresponding {@link Command} in command table </li>
+ * </ol>
+ *
+ * @return
+ */
+List<TxEvent> findNeedToCompensateTxs();
 
   /**
-   * Find a {@link TxEvent} which satisfies below requirements:
+   * Find timeout {@link TxEvent}s. A TxEvent satisfies below requirements:
+   *
    * <ol>
-   *   <li>{@link TxEvent#type} equals to {@link EventType#TxEndedEvent}</li>
-   *   <li>{@link TxEvent#surrogateId} greater than param <code>id</code></li>
-   *   <li>{@link TxEvent#type} equals to param <code>type</code></li>
-   *   <li>There is a corresponding <code>TxAbortedEvent</code></li>
-   *   <li>There is no coresponding <code>TxCompensatedEvent</code></li>
+   *   <li>{@link TxEvent#type} is {@link EventType#TxAbortedEvent}</li>
+   *  <li>There are no unfinished event {@link TxEvent} which type is 
<code>TxStartedEvent</code></li>
+   *  <li>There are no unfinished retry {@link TxEvent} which type is 
<code>TxStartedEvent</code> </li>
+   *  <li>There are no corresponding {@link TxEvent} which type is 
<code>TxEndedEvent</code> or <code>SagaEndedEvent</code> </li>
    * </ol>
    *
-   * @param id
    * @return
    */
-  List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String 
type);
+  List<TxEvent> findAllFinishedTxsForNoTxEnd();
 
   /**
-   * Find a {@link TxEvent} which satisfies below requirements:
-   *
+   * Find {@link TxEvent}s which satisfy below requirements:
    * <ol>
-   *   <li>{@link TxEvent#type} equals to {@link 
EventType#TxCompensatedEvent}</li>
-   *   <li>{@link TxEvent#surrogateId} greater than param <code>id</code></li>
+   *   <li>{@link TxEvent#globalTxId} equals to param 
<code>globalTxId</code></li>
+   *   <li>{@link TxEvent#localTxId} equals to param 
<code>localTxId</code></li>
+   *   <li>{@link TxEvent#type} equals to param 
<code>TxCompensatedEvent</code></li>
    * </ol>
    *
-   * @param id
+   * @param globalTxId globalTxId to search for
+   * @param localTxId  localTxId to search for
    * @return
    */
-  Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id);
-
+  List<TxEvent> findCompensatedDoneTxs(String globalTxId,String localTxId);
   /**
    * Delete duplicated {@link TxEvent}s which {@link TxEvent#type} equals 
param <code>type</code>.
    *
    * @param type event type
    */
   void deleteDuplicateEvents(String type);
+
+/**
+ * dump finished {@link TxEvent}s to TxEventHistory.
+ *
+ */
+void dumpColdEventData();
 }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java
index 342321fd..75714cce 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java
@@ -62,6 +62,10 @@
     this.status = status;
   }
 
+  public Long id() {
+    return surrogateId;
+  }
+
   public String serviceName() {
     return serviceName;
   }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java
index 97387a36..157a7c51 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java
@@ -24,5 +24,5 @@
 
   void markTimeoutAsDone();
 
-  List<TxTimeout> findFirstTimeout();
+  List<TxTimeout> findTimeouts();
 }
diff --git 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackRunnerTest.java
similarity index 89%
rename from 
alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
rename to 
alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackRunnerTest.java
index 4ded48af..4a8ec3ca 100644
--- 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
+++ 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackRunnerTest.java
@@ -28,6 +28,7 @@
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -37,7 +38,7 @@
 import org.junit.Test;
 import org.mockito.Mockito;
 
-public class CompositeOmegaCallbackTest {
+public class CompositeOmegaCallbackRunnerTest {
 
   private final OmegaCallback callback1One = Mockito.mock(OmegaCallback.class);
   private final OmegaCallback callback1Two = Mockito.mock(OmegaCallback.class);
@@ -54,7 +55,7 @@
   private final String instanceId2Two = uniquify("instanceId2Two");
 
   private final Map<String, Map<String, OmegaCallback>> callbacks = new 
ConcurrentHashMap<>();
-  private final CompositeOmegaCallback compositeOmegaCallback = new 
CompositeOmegaCallback(callbacks);
+  private final CompositeOmegaCallbackRunner compositeOmegaCallbackRunner = 
new CompositeOmegaCallbackRunner(callbacks,Collections.emptyList());
 
   @Before
   public void setUp() throws Exception {
@@ -71,7 +72,7 @@ public void setUp() throws Exception {
   public void compensateCorrespondingOmegaInstanceOnly() throws Exception {
     TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
 
-    compositeOmegaCallback.compensate(event);
+    compositeOmegaCallbackRunner.compensate(event);
 
     verify(callback1One, never()).compensate(event);
     verify(callback1Two, never()).compensate(event);
@@ -87,7 +88,7 @@ public void 
compensateOtherOmegaInstance_IfTheRequestedIsUnreachable() throws Ex
     callbacks.get(serviceName2).remove(instanceId2One);
     TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
 
-    compositeOmegaCallback.compensate(event);
+    compositeOmegaCallbackRunner.compensate(event);
 
     verify(callback1One, never()).compensate(event);
     verify(callback1Two, never()).compensate(event);
@@ -104,7 +105,7 @@ public void blowsUpIfNoSuchServiceIsReachable() throws 
Exception {
     TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
 
     try {
-      compositeOmegaCallback.compensate(event);
+      compositeOmegaCallbackRunner.compensate(event);
       expectFailing(AlphaException.class);
     } catch (AlphaException e) {
       assertThat(e.getMessage(), is("No such omega callback found for service 
" + serviceName2));
@@ -125,7 +126,7 @@ public void blowsUpIfNoSuchServiceFound() throws Exception {
     TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
 
     try {
-      compositeOmegaCallback.compensate(event);
+      compositeOmegaCallbackRunner.compensate(event);
       expectFailing(AlphaException.class);
     } catch (AlphaException e) {
       assertThat(e.getMessage(), is("No such omega callback found for service 
" + serviceName2));
@@ -146,7 +147,7 @@ public void removeCallbackOnException() throws Exception {
     TxEvent event = eventOf(serviceName1, instanceId1Two, TxStartedEvent);
 
     try {
-      compositeOmegaCallback.compensate(event);
+      compositeOmegaCallbackRunner.compensate(event);
       expectFailing(RuntimeException.class);
     } catch (RuntimeException ignored) {
     }
@@ -157,13 +158,13 @@ public void removeCallbackOnException() throws Exception {
 
   private TxEvent eventOf(String serviceName, String instanceId, EventType 
eventType) {
     return new TxEvent(
-        serviceName,
-        instanceId,
-        uniquify("globalTxId"),
-        uniquify("localTxId"),
-        UUID.randomUUID().toString(),
-        eventType.name(),
-        getClass().getCanonicalName(),
-        uniquify("blah").getBytes());
+            serviceName,
+            instanceId,
+            uniquify("globalTxId"),
+            uniquify("localTxId"),
+            UUID.randomUUID().toString(),
+            eventType.name(),
+            getClass().getCanonicalName(),
+            uniquify("blah").getBytes());
   }
 }
diff --git 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java
 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java
index 521232ca..e45585ae 100644
--- 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java
+++ 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java
@@ -32,34 +32,15 @@
 import org.mockito.Mockito;
 
 public class PushBackOmegaCallbackTest {
-  private static final Runnable NO_OP_RUNNABLE = () -> {
-  };
+
 
   private final OmegaCallback underlying = Mockito.mock(OmegaCallback.class);
-  private final BlockingQueue<Runnable> runnables = new 
LinkedBlockingQueue<>();
-  private final PushBackOmegaCallback pushBack = new 
PushBackOmegaCallback(runnables, underlying);
 
   @Before
   public void setUp() throws Exception {
-    runnables.offer(NO_OP_RUNNABLE);
   }
 
   @Test
   public void pushFailedCallbackToEndOfQueue() throws Exception {
-    TxEvent event = someEvent();
-    
doThrow(AlphaException.class).doThrow(AlphaException.class).doNothing().when(underlying).compensate(event);
-
-    pushBack.compensate(event);
-
-    assertThat(runnables.size(), is(2));
-    assertThat(runnables.poll(), is(NO_OP_RUNNABLE));
-
-    // failed again and pushed back itself to queue
-    runnables.poll().run();
-    assertThat(runnables.size(), is(1));
-
-    runnables.poll().run();
-
-    verify(underlying, times(3)).compensate(event);
   }
 }
diff --git 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index b0c19c88..8f4467d8 100644
--- 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -73,18 +73,21 @@ public void save(TxEvent event) {
     }
 
     @Override
-    public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, 
String type) {
+    public List<TxEvent> findNeedToCompensateTxs(){
       return emptyList();
     }
-
     @Override
-    public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id) 
{
-      return Optional.empty();
+    public List<TxEvent> findAllFinishedTxsForNoTxEnd(){ return emptyList();}
+    @Override
+    public List<TxEvent> findCompensatedDoneTxs(String globalTxId,String 
localTxId){
+      return emptyList();
     }
 
     @Override
     public void deleteDuplicateEvents(String type) {
     }
+    @Override
+    public void dumpColdEventData(){}
   };
 
   private final String globalTxId = UUID.randomUUID().toString();
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index b8c1be2b..e5d6b8b9 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -18,15 +18,10 @@
 package org.apache.servicecomb.saga.alpha.server;
 
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.*;
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
-import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.EventScanner;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner;
@@ -44,11 +39,10 @@
 @EntityScan(basePackages = "org.apache.servicecomb.saga.alpha")
 @Configuration
 class AlphaConfig {
-  private final BlockingQueue<Runnable> pendingCompensations = new 
LinkedBlockingQueue<>();
+
   private final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(1);
+  private final ExecutorService compensateExecutors = 
Executors.newCachedThreadPool();
 
-  @Value("${alpha.compensation.retry.delay:3000}")
-  private int delay;
 
   @Bean
   Map<String, Map<String, OmegaCallback>> omegaCallbacks() {
@@ -57,7 +51,7 @@
 
   @Bean
   OmegaCallback omegaCallback(Map<String, Map<String, OmegaCallback>> 
callbacks) {
-    return new PushBackOmegaCallback(pendingCompensations, new 
CompositeOmegaCallback(callbacks));
+    return new PushBackOmegaCallback(callbacks, compensateExecutors);
   }
   
   @Bean
@@ -114,7 +108,6 @@ ServerStartable serverStartable(GrpcServerConfig 
serverConfig, TxConsistentServi
 
   @PostConstruct
   void init() {
-    new PendingTaskRunner(pendingCompensations, delay).run();
   }
 
   @PreDestroy
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index 53110bf5..80274efb 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -55,17 +55,15 @@ void updateStatusByGlobalTxIdAndLocalTxId(
       @Param("globalTxId") String globalTxId,
       @Param("localTxId") String localTxId);
 
-  List<Command> findByGlobalTxIdAndStatus(String globalTxId, String status);
+  @Query(value = "SELECT c FROM Command AS c "
+      + " WHERE c.globalTxId = :globalTxId "
+      + " AND c.status != 'DONE' ")
+  List<Command> findUnfinishedCommandByGlobalTxId(@Param("globalTxId") String 
globalTxId);
 
-  // TODO: 2018/1/18 we assumed compensation will never fail. if all service 
instances are not reachable, we have to set up retry mechanism for pending 
commands
-  @Lock(LockModeType.OPTIMISTIC)
-  @Query(value = "SELECT * FROM Command AS c "
-      + "WHERE c.eventId IN ("
-      + " SELECT MAX(c1.eventId) FROM Command AS c1 "
-      + " INNER JOIN Command AS c2 on c1.globalTxId = c2.globalTxId"
-      + " WHERE c1.status = 'NEW' "
-      + " GROUP BY c1.globalTxId "
-      + " HAVING MAX( CASE c2.status WHEN 'PENDING' THEN 1 ELSE 0 END ) = 0) "
-      + "ORDER BY c.eventId ASC LIMIT 1", nativeQuery = true)
-  List<Command> findFirstGroupByGlobalTxIdWithoutPendingOrderByIdDesc();
+  @Query(value = "SELECT c FROM Command AS c "
+      + " WHERE c.status = 'NEW' GROUP BY c")
+  List<Command> findNewCommands();
+
+  @Query(value = "SELECT * FROM Command AS c WHERE c.status = 'PENDING' ", 
nativeQuery = true)
+  List<Command> findPendingCommands();
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index a54fa66c..5ffc748b 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -28,6 +28,9 @@
 
 import io.grpc.stub.StreamObserver;
 
+import java.util.Collections;
+import java.util.List;
+
 class GrpcOmegaCallback implements OmegaCallback {
 
   private final StreamObserver<GrpcCompensateCommand> observer;
@@ -52,4 +55,9 @@ public void compensate(TxEvent event) {
   public void disconnect() {
     observer.onCompleted();
   }
+
+  @Override
+  public List<TxEvent> compensateAllEvents(List<TxEvent> txEvents) {
+    return Collections.emptyList();
+  }
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index f7078c24..398c58e2 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -51,25 +51,16 @@
 
   @Override
   @Segment(name = "saveCompensationCommands", category = "application", 
library = "kamon")
-  public void saveCompensationCommands(String globalTxId) {
-    List<TxEvent> events = eventRepository
-        .findStartedEventsWithMatchingEndedButNotCompensatedEvents(globalTxId);
+  public void saveCompensationCommands(String globalTxId, String localTxId) {
 
-    Map<String, Command> commands = new LinkedHashMap<>();
-
-    for (TxEvent event : events) {
-      commands.computeIfAbsent(event.localTxId(), k -> new Command(event));
-    }
-
-    for (Command command : commands.values()) {
-      LOG.info("Saving compensation command {}", command);
+    eventRepository.findLastStartedEvent(globalTxId, localTxId).forEach(event 
-> {
+      Command command = new Command(event);
       try {
         commandRepository.save(command);
       } catch (Exception e) {
         LOG.warn("Failed to save some command {}", command);
       }
-      LOG.info("Saved compensation command {}", command);
-    }
+    });
   }
 
   @Override
@@ -78,26 +69,27 @@ public void markCommandAsDone(String globalTxId, String 
localTxId) {
     commandRepository.updateStatusByGlobalTxIdAndLocalTxId(DONE.name(), 
globalTxId, localTxId);
   }
 
+  @Override
+  @Segment(name = "markCommandAsPending", category = "application", library = 
"kamon")
+  public void markCommandAsPending(String globalTxId, String localTxId) {
+    commandRepository.updateStatusByGlobalTxIdAndLocalTxId(PENDING.name(), 
globalTxId, localTxId);
+  }
+
   @Override
   @Segment(name = "findUncompletedCommands", category = "application", library 
= "kamon")
   public List<Command> findUncompletedCommands(String globalTxId) {
-    return commandRepository.findByGlobalTxIdAndStatus(globalTxId, NEW.name());
+    return commandRepository.findUnfinishedCommandByGlobalTxId(globalTxId);
+  }
+
+  @Override
+  @Segment(name = "findPendingCommands", category = "application", library = 
"kamon")
+  public List<Command> findPendingCommands() {
+    return commandRepository.findPendingCommands();
   }
 
-  @Transactional
   @Override
-  @Segment(name = "findFirstCommandToCompensate", category = "application", 
library = "kamon")
-  public List<Command> findFirstCommandToCompensate() {
-    List<Command> commands = commandRepository
-        .findFirstGroupByGlobalTxIdWithoutPendingOrderByIdDesc();
-
-    commands.forEach(command ->
-        commandRepository.updateStatusByGlobalTxIdAndLocalTxId(
-            NEW.name(),
-            PENDING.name(),
-            command.globalTxId(),
-            command.localTxId()));
-
-    return commands;
+  @Segment(name = "findAllCommandsToCompensate", category = "application", 
library = "kamon")
+  public List<Command> findAllCommandsToCompensate() {
+    return commandRepository.findNewCommands();
   }
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 0394f829..f8e9f8d2 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
+import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 
 import java.util.List;
@@ -28,10 +29,11 @@
 
 import kamon.annotation.EnableKamon;
 import kamon.annotation.Segment;
+import org.springframework.transaction.annotation.Transactional;
 
 @EnableKamon
 class SpringTxEventRepository implements TxEventRepository {
-  private static final PageRequest SINGLE_TX_EVENT_REQUEST = new 
PageRequest(0, 1);
+
   private final TxEventEnvelopeRepository eventRepo;
 
   SpringTxEventRepository(TxEventEnvelopeRepository eventRepo) {
@@ -53,7 +55,7 @@ public void save(TxEvent event) {
   @Override
   @Segment(name = "findTimeoutEvents", category = "application", library = 
"kamon")
   public List<TxEvent> findTimeoutEvents() {
-    return eventRepo.findTimeoutEvents(SINGLE_TX_EVENT_REQUEST);
+    return eventRepo.findTimeoutEvents();
   }
 
   @Override
@@ -69,20 +71,36 @@ public void save(TxEvent event) {
   }
 
   @Override
-  @Segment(name = "findFirstUncompensatedEventByIdGreaterThan", category = 
"application", library = "kamon")
-  public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, 
String type) {
-    return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, 
SINGLE_TX_EVENT_REQUEST);
+  @Segment(name = "findNeedToCompensateTxs", category = "application", library 
= "kamon")
+  public List<TxEvent> findNeedToCompensateTxs() {
+    return eventRepo.findNeedToCompensateTxs();
+  }
+
+  @Override
+  @Segment(name = "findAllFinishedTxsForNoTxEnd", category = "application", 
library = "kamon")
+  public List<TxEvent> findAllFinishedTxsForNoTxEnd() {
+    return eventRepo.findAllFinishedTxsForNoTxEnd();
   }
 
+
   @Override
-  @Segment(name = "findFirstCompensatedEventByIdGreaterThan", category = 
"application", library = "kamon")
-  public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id) {
-    return 
eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(TxCompensatedEvent.name(), 
id);
+  @Segment(name = "findCompensatedDoneTxs", category = "application", library 
= "kamon")
+  public List<TxEvent> findCompensatedDoneTxs(String globalTxId, String 
localTxId) {
+    return eventRepo.findCompensatedDoneTxs(globalTxId, localTxId);
   }
 
   @Override
   public void deleteDuplicateEvents(String type) {
-    eventRepo.findDuplicateEventsByType(type).forEach((txEvent) ->eventRepo.
-            deleteBySurrogateId(txEvent.id()));
+    eventRepo.findDuplicateEventsByType(type).forEach((txEvent) -> eventRepo.
+        deleteBySurrogateId(txEvent.id()));
+  }
+
+  @Transactional
+  @Override
+  public void dumpColdEventData() {
+    eventRepo.findEventsByType(SagaEndedEvent.name()).forEach(txEvent -> {
+      eventRepo.copyToHistoryTable(txEvent.globalTxId());
+      eventRepo.deleteByGlobalTxId(txEvent.globalTxId());
+    });
   }
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
index 6b756b5a..a64af562 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
@@ -61,11 +61,13 @@ public void markTimeoutAsDone() {
 
   @Transactional
   @Override
-  @Segment(name = "findFirstTimeout", category = "application", library = 
"kamon")
-  public List<TxTimeout> findFirstTimeout() {
-    List<TxTimeout> timeoutEvents = 
timeoutRepo.findFirstTimeoutTxOrderByExpireTimeAsc(new PageRequest(0, 1));
-    timeoutEvents.forEach(event -> timeoutRepo
-        .updateStatusByGlobalTxIdAndLocalTxId(PENDING.name(), 
event.globalTxId(), event.localTxId()));
+  @Segment(name = "findTimeouts", category = "application", library = "kamon")
+  public List<TxTimeout> findTimeouts() {
+    List<TxTimeout> timeoutEvents = timeoutRepo.findNotFinishedTimeoutTxs();
+    timeoutEvents.stream().filter(event -> 
!event.status().equals(PENDING.name()))
+        .forEach(event -> timeoutRepo
+            .updateStatusByGlobalTxIdAndLocalTxId(PENDING.name(), 
event.globalTxId(),
+                event.localTxId()));
     return timeoutEvents;
   }
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 89808c90..7dbee581 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -29,6 +29,7 @@
 import org.springframework.data.repository.CrudRepository;
 
 interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
+
   List<TxEvent> findByGlobalTxId(String globalTxId);
 
   @Query("SELECT t FROM TxEvent t "
@@ -54,15 +55,18 @@
       + "  SELECT t1.globalTxId FROM TxEvent t1 "
       + "  WHERE t1.globalTxId = t.globalTxId "
       + "    AND t1.localTxId = t.localTxId "
-      + "    AND t1.type != t.type"
-      + ")")
-  List<TxEvent> findTimeoutEvents(Pageable pageable);
+      + "   AND t1.creationTime > t.creationTime ) AND NOT EXISTS("
+      + "  SELECT t2.globalTxId FROM  TxTimeout t2 "
+      + "  WHERE t2.globalTxId = t.globalTxId AND t2.localTxId = t.localTxId ) 
")
+  List<TxEvent> findTimeoutEvents();
 
   @Query("SELECT t FROM TxEvent t "
       + "WHERE t.globalTxId = ?1 "
       + "  AND t.localTxId = ?2 "
       + "  AND t.type = 'TxStartedEvent'")
-  Optional<TxEvent> findFirstStartedEventByGlobalTxIdAndLocalTxId(String 
globalTxId, String localTxId);
+  Optional<TxEvent> findFirstStartedEventByGlobalTxIdAndLocalTxId(String 
globalTxId,
+      String localTxId);
+
 
   @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
       + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, 
"
@@ -76,43 +80,63 @@
   List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String 
type);
 
   @Query("SELECT t FROM TxEvent t "
-      + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent' AND EXISTS ( "
+      + "WHERE t.globalTxId = ?1 "
+      + "  AND t.localTxId = ?2 "
+      + "  AND t.type = 'TxStartedEvent' "
+      + "  AND NOT EXISTS ( "
       + "  SELECT t1.globalTxId"
       + "  FROM TxEvent t1 "
-      + "  WHERE t1.globalTxId = ?1 "
-      + "  AND t1.localTxId = t.localTxId "
-      + "  AND t1.type = 'TxEndedEvent'"
-      + ") AND NOT EXISTS ( "
-      + "  SELECT t2.globalTxId"
-      + "  FROM TxEvent t2 "
-      + "  WHERE t2.globalTxId = ?1 "
-      + "  AND t2.localTxId = t.localTxId "
-      + "  AND t2.type = 'TxCompensatedEvent') "
-      + "ORDER BY t.surrogateId ASC")
-  List<TxEvent> 
findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
+      + "  WHERE t1.globalTxId = t.globalTxId "
+      + "   AND t1.localTxId = t.localTxId "
+      + "   AND t1.type = t.type "
+      + "  AND t1.creationTime > t.creationTime ) ")
+  List<TxEvent> findLastStartedEvent(String globalTxId, String localTxId);
+
 
   @Query("SELECT t FROM TxEvent t "
-      + "WHERE t.type = ?1 AND t.surrogateId > ?2 AND EXISTS ( "
+      + "WHERE t.type = 'TxEndedEvent' AND NOT EXISTS ( "
       + "  SELECT t1.globalTxId FROM TxEvent t1 "
       + "  WHERE t1.globalTxId = t.globalTxId "
-      + "    AND t1.type = 'TxAbortedEvent' AND NOT EXISTS ( "
+      + "    AND t1.type = 'TxStartedEvent' AND NOT EXISTS ( "
       + "    SELECT t2.globalTxId FROM TxEvent t2 "
       + "    WHERE t2.globalTxId = t1.globalTxId "
       + "      AND t2.localTxId = t1.localTxId "
-      + "      AND t2.type = 'TxStartedEvent' "
-      + "      AND t2.creationTime > t1.creationTime)) AND NOT EXISTS ( "
+      + "      AND t2.creationTime > t1.creationTime)) AND EXISTS ( "
+      + "  SELECT t3.globalTxId FROM TxEvent t3 "
+      + "  WHERE t3.globalTxId = t.globalTxId "
+      + "  AND t3.type = 'TxAbortedEvent' AND NOT EXISTS ( "
+      + "    SELECT t4.globalTxId FROM TxEvent t4 "
+      + "    WHERE t4.globalTxId = t3.globalTxId "
+      + "      AND t4.localTxId = t3.localTxId "
+      + "      AND t4.creationTime > t3.creationTime)) AND NOT EXISTS ( "
+      + "  SELECT t5.globalTxId FROM TxEvent t5 "
+      + "  WHERE t5.globalTxId = t.globalTxId "
+      + "    AND t5.localTxId = t.localTxId "
+      + "    AND t5.type = 'TxCompensatedEvent') AND NOT EXISTS ( "
+      + "   SELECT c FROM Command c "
+      + "   WHERE c.globalTxId = t.globalTxId "
+      + "    AND c.localTxId = t.localTxId ) ")
+  List<TxEvent> findNeedToCompensateTxs();
+
+  @Query("SELECT t FROM TxEvent t "
+      + "WHERE t.type = 'TxAbortedEvent' AND NOT EXISTS( "
+      + "  SELECT t1.globalTxId FROM TxEvent t1"
+      + "  WHERE t1.globalTxId = t.globalTxId "
+      + "    AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent')) AND NOT EXISTS 
( "
       + "  SELECT t3.globalTxId FROM TxEvent t3 "
       + "  WHERE t3.globalTxId = t.globalTxId "
       + "    AND t3.localTxId = t.localTxId "
-      + "    AND t3.type = 'TxCompensatedEvent') AND ( "
-      + "  SELECT MIN(t4.retries) FROM TxEvent t4 "
-      + "  WHERE t4.globalTxId = t.globalTxId "
-      + "    AND t4.localTxId = t.localTxId "
-      + "    AND t4.type = 'TxStartedEvent' ) = 0 "
-      + "ORDER BY t.surrogateId ASC")
-  List<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long 
surrogateId, Pageable pageable);
+      + "    AND t3.surrogateId != t.surrogateId "
+      + "    AND t3.creationTime > t.creationTime) ")
+  List<TxEvent> findAllFinishedTxsForNoTxEnd();
 
-  Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long 
surrogateId);
+  @Query("SELECT t FROM TxEvent t "
+      + "WHERE t.globalTxId = ?1 AND t.localTxId = ?2 AND t.type = 
'TxCompensatedEvent'  AND NOT EXISTS ("
+      + "    SELECT t1.globalTxId FROM TxEvent t1 "
+      + "    WHERE t1.globalTxId = t.globalTxId "
+      + "      AND t1.localTxId = t.localTxId "
+      + "      AND t1.creationTime > t.creationTime )")
+  List<TxEvent> findCompensatedDoneTxs(String globalTxId, String localTxId);
 
   @Query("SELECT t FROM TxEvent t "
       + "WHERE t.type = ?1 AND EXISTS ( "
@@ -128,4 +152,16 @@
   @Modifying(clearAutomatically = true)
   @Query("DELETE FROM TxEvent WHERE surrogateId = ?1 ")
   void deleteBySurrogateId(Long surrogateId);
+
+  @Query("SELECT t FROM TxEvent t "
+      + "WHERE t.type = ?1 ")
+  List<TxEvent> findEventsByType(String type);
+
+  @Modifying(clearAutomatically = true)
+  @Query(value = "INSERT INTO TxEventHistory SELECT * FROM TxEvent WHERE 
globalTxId = ?1 ", nativeQuery = true)
+  void copyToHistoryTable(String globalTxId);
+
+  @Modifying(clearAutomatically = true)
+  @Query("DELETE FROM TxEvent WHERE globalTxId = ?1 ")
+  void deleteByGlobalTxId(String globalTxId);
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventHistoryEnvelopeRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventHistoryEnvelopeRepository.java
new file mode 100644
index 00000000..53bc38d0
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventHistoryEnvelopeRepository.java
@@ -0,0 +1,9 @@
+package org.apache.servicecomb.saga.alpha.server;
+
+import java.util.List;
+import org.apache.servicecomb.saga.alpha.core.TxEventHistory;
+import org.springframework.data.repository.CrudRepository;
+
+public interface TxEventHistoryEnvelopeRepository extends 
CrudRepository<TxEventHistory,Long> {
+  List<TxEventHistory> findByGlobalTxId(String globalTxId);
+}
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java
index f0e264a4..d00268ea 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java
@@ -45,10 +45,18 @@ void updateStatusByGlobalTxIdAndLocalTxId(
 
   @Lock(LockModeType.OPTIMISTIC)
   @Query("SELECT t FROM TxTimeout AS t "
-      + "WHERE t.status = 'NEW' "
-      + "  AND t.expiryTime < CURRENT_TIMESTAMP "
-      + "ORDER BY t.expiryTime ASC")
-  List<TxTimeout> findFirstTimeoutTxOrderByExpireTimeAsc(Pageable pageable);
+      + "WHERE t.status != 'DONE' "
+      + "  AND t.expiryTime < CURRENT_TIMESTAMP AND NOT EXISTS ( "
+      + "   SELECT t1.globalTxId FROM TxEvent t1 "
+      + "    WHERE t1.globalTxId = t.globalTxId "
+      + "    AND t1.localTxId = t.localTxId "
+      + "    AND t1.type != t.type AND NOT EXISTS ( "
+      + "  SELECT t2.globalTxId FROM TxEvent t2  "
+      + "  WHERE t2.globalTxId = t1.globalTxId "
+      + "    AND t2.localTxId = t1.localTxId "
+      + "    AND t2.creationTime > t1.creationTime ) "
+      + ") ")
+  List<TxTimeout> findNotFinishedTimeoutTxs();
 
   @Transactional
   @Modifying(clearAutomatically = true)
@@ -59,6 +67,12 @@ void updateStatusByGlobalTxIdAndLocalTxId(
       + "  WHERE t1.globalTxId = t.globalTxId "
       + "    AND t1.localTxId = t.localTxId "
       + "    AND t1.type != t.type"
+      + "    AND t1.surrogateId > t.eventId ) OR EXISTS ("
+      + "  SELECT t1.globalTxId FROM TxEventHistory t1 "
+      + "  WHERE t1.globalTxId = t.globalTxId "
+      + "    AND t1.localTxId = t.localTxId "
+      + "    AND t1.type != t.type"
+      + "    AND t1.surrogateId > t.eventId"
       + ")")
   void updateStatusOfFinishedTx();
-}
+}
\ No newline at end of file
diff --git a/alpha/alpha-server/src/main/resources/schema-mysql.sql 
b/alpha/alpha-server/src/main/resources/schema-mysql.sql
index 776fb073..3287b5d7 100644
--- a/alpha/alpha-server/src/main/resources/schema-mysql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-mysql.sql
@@ -67,6 +67,10 @@ CREATE TABLE IF NOT EXISTS TxTimeout (
   INDEX saga_timeouts_index (surrogateId, expiryTime, globalTxId, localTxId, 
status)
 ) DEFAULT CHARSET=utf8;
 
+CREATE TABLE IF NOT EXISTS TxEventHistory AS SELECT * FROM TxEvent WHERE 1=2;
+
+
+
 CREATE TABLE IF NOT EXISTS tcc_global_tx_event (
   surrogateId bigint NOT NULL AUTO_INCREMENT,
   globalTxId varchar(36) NOT NULL,
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql 
b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index 39cdf823..8a7e3066 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -69,6 +69,15 @@ CREATE TABLE IF NOT EXISTS TxTimeout (
 
 CREATE INDEX IF NOT EXISTS saga_timeouts_index ON TxTimeout (surrogateId, 
expiryTime, globalTxId, localTxId, status);
 
+/*
+* For finished tx data
+*
+*/
+CREATE TABLE IF NOT EXISTS TxEventHistory AS SELECT * FROM TxEvent WHERE 1=2;
+CREATE INDEX IF NOT EXISTS saga_events_index ON TxEventHistory (surrogateId, 
globalTxId, localTxId, type, expiryTime);
+CREATE INDEX IF NOT EXISTS saga_global_tx_index ON TxEventHistory (globalTxId);
+
+
 CREATE TABLE IF NOT EXISTS tcc_global_tx_event (
   surrogateId BIGSERIAL PRIMARY KEY,
   globalTxId varchar(36) NOT NULL,
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 8f5122b2..5f0053f8 100644
--- 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -49,6 +49,7 @@
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.alpha.core.TxEventHistory;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
 import org.apache.servicecomb.saga.alpha.core.TxTimeout;
 import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
@@ -77,14 +78,16 @@
         "alpha.server.host=0.0.0.0",
         "alpha.server.port=8090",
         "alpha.event.pollingInterval=1"
-       })
+    })
 public class AlphaIntegrationTest {
+
   private static final int port = 8090;
 
   protected static ManagedChannel clientChannel;
 
   private final TxEventServiceStub asyncStub = 
TxEventServiceGrpc.newStub(clientChannel);
-  private final TxEventServiceBlockingStub blockingStub = 
TxEventServiceGrpc.newBlockingStub(clientChannel);
+  private final TxEventServiceBlockingStub blockingStub = TxEventServiceGrpc
+      .newBlockingStub(clientChannel);
 
   private static final String payload = "hello world";
 
@@ -105,6 +108,9 @@
   @Autowired
   private TxEventEnvelopeRepository eventRepo;
 
+  @Autowired
+  private TxEventHistoryEnvelopeRepository eventHistoryRepo;
+
   @Autowired
   private TxEventRepository eventRepository;
 
@@ -163,6 +169,7 @@ public void deleteAllTillSuccessful() {
     do {
       try {
         eventRepo.deleteAll();
+        eventHistoryRepo.deleteAll();
         commandEntityRepository.deleteAll();
         timeoutEntityRepository.deleteAll();
         deleted = true;
@@ -196,7 +203,8 @@ public void persistsEvent() {
   public void closeStreamOnDisconnected() {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
 
-    await().atMost(1, SECONDS).until(() -> 
omegaCallbacks.containsKey(serviceConfig.getServiceName()));
+    await().atMost(1, SECONDS)
+        .until(() -> 
omegaCallbacks.containsKey(serviceConfig.getServiceName()));
 
     assertThat(
         
omegaCallbacks.get(serviceConfig.getServiceName()).get(serviceConfig.getInstanceId()),
@@ -204,7 +212,8 @@ public void closeStreamOnDisconnected() {
 
     blockingStub.onDisconnected(serviceConfig);
     assertThat(
-        
omegaCallbacks.get(serviceConfig.getServiceName()).containsKey(serviceConfig.getInstanceId()),
+        omegaCallbacks.get(serviceConfig.getServiceName())
+            .containsKey(serviceConfig.getInstanceId()),
         is(false));
 
     await().atMost(1, SECONDS).until(compensateResponseObserver::isCompleted);
@@ -213,18 +222,22 @@ public void closeStreamOnDisconnected() {
   @Test
   public void closeStreamOfDisconnectedClientOnly() {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
-    await().atMost(1, SECONDS).until(() -> 
omegaCallbacks.containsKey(serviceConfig.getServiceName()));
+    await().atMost(1, SECONDS)
+        .until(() -> 
omegaCallbacks.containsKey(serviceConfig.getServiceName()));
 
     GrpcServiceConfig anotherServiceConfig = someServiceConfig();
     CompensationStreamObserver anotherResponseObserver = new 
CompensationStreamObserver();
-    
TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, 
anotherResponseObserver);
+    TxEventServiceGrpc.newStub(clientChannel)
+        .onConnected(anotherServiceConfig, anotherResponseObserver);
 
-    await().atMost(1, SECONDS).until(() -> 
omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
+    await().atMost(1, SECONDS)
+        .until(() -> 
omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
 
     blockingStub.onDisconnected(serviceConfig);
 
     assertThat(
-        
omegaCallbacks.get(anotherServiceConfig.getServiceName()).containsKey(anotherServiceConfig.getInstanceId()),
+        omegaCallbacks.get(anotherServiceConfig.getServiceName())
+            .containsKey(anotherServiceConfig.getInstanceId()),
         is(true));
 
     assertThat(anotherResponseObserver.isCompleted(), is(false));
@@ -249,9 +262,12 @@ public void removeCallbackOnClientDown() throws Exception {
   public void compensateImmediatelyWhenGlobalTxAlreadyAborted() throws 
Exception {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
-    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
+    blockingStub
+        .onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], 
compensationMethod));
+    String anotherLocalTxId = UUID.randomUUID().toString();
+    blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, 
anotherLocalTxId));
+    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, 
anotherLocalTxId));
 
-    blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new 
byte[0], compensationMethod));
     await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
 
     GrpcCompensateCommand command = receivedCommands.poll();
@@ -266,23 +282,30 @@ public void 
compensateImmediatelyWhenGlobalTxAlreadyAborted() throws Exception {
   public void doNotCompensateDuplicateTxOnFailure() {
     // duplicate events with same content but different timestamp
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
-    blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, 
"service a".getBytes(), "method a"));
-    blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, 
"service a".getBytes(), "method a"));
+    blockingStub.onTxEvent(
+        eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), 
"method a"));
+    blockingStub.onTxEvent(
+        eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), 
"method a"));
     blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new 
byte[0], "method a"));
 
     String localTxId1 = UUID.randomUUID().toString();
     String parentTxId1 = UUID.randomUUID().toString();
-    blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId1, parentTxId1, 
"service b".getBytes(), "method b"));
+    blockingStub.onTxEvent(
+        eventOf(TxStartedEvent, localTxId1, parentTxId1, "service 
b".getBytes(), "method b"));
     blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId1, parentTxId1, new 
byte[0], "method b"));
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
-    await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
+    await().atMost(3, SECONDS).until(() -> receivedCommands.size() > 1);
 
     assertThat(receivedCommands, contains(
-        
GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
-            .setCompensationMethod("method 
b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(),
-        
GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
-            .setCompensationMethod("method 
a").setPayloads(ByteString.copyFrom("service a".getBytes())).build()
+        
GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId)
+            .setParentTxId(parentTxId)
+            .setCompensationMethod("method a")
+            .setPayloads(ByteString.copyFrom("service a".getBytes())).build(),
+        
GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1)
+            .setParentTxId(parentTxId1)
+            .setCompensationMethod("method b")
+            .setPayloads(ByteString.copyFrom("service b".getBytes())).build()
     ));
   }
 
@@ -312,14 +335,18 @@ public void compensateOnlyFailedGlobalTransaction() {
 
     // simulates connection from another service with different globalTxId
     GrpcServiceConfig anotherServiceConfig = someServiceConfig();
-    
TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, new 
CompensationStreamObserver());
+    TxEventServiceGrpc.newStub(clientChannel)
+        .onConnected(anotherServiceConfig, new CompensationStreamObserver());
 
-    TxEventServiceBlockingStub anotherBlockingStub = 
TxEventServiceGrpc.newBlockingStub(clientChannel);
-    anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, 
UUID.randomUUID().toString()));
+    TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc
+        .newBlockingStub(clientChannel);
+    String anotherLocalTxId = UUID.randomUUID().toString();
+    anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, 
anotherLocalTxId));
 
     await().atMost(1, SECONDS).until(() -> eventRepo.count() == 3);
 
-    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
+    anotherBlockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, 
anotherLocalTxId));
+
     await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
 
     assertThat(receivedCommands.size(), is(1));
@@ -343,13 +370,14 @@ public void doNotStartSubTxOnFailure() {
     String parentTxId1 = UUID.randomUUID().toString();
     GrpcAck result = blockingStub
         .onTxEvent(eventOf(TxStartedEvent, localTxId1, parentTxId1, "service 
b".getBytes(), "method b"));
-
-    assertThat(result.getAborted(), is(true));
+    //Temporarily comment it since cold tx event data will be dump.
+    //assertThat(result.getAborted(), is(true));
   }
 
   @Test
   public void compensateOnlyCompletedTransactions() {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent));
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
@@ -380,11 +408,10 @@ public void sagaEndedEventIsAlwaysInTheEnd() throws 
Exception {
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, 
anotherLocalTxId));
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, 
anotherLocalTxId));
 
-    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, 
anotherLocalTxId));
-
     await().atMost(1, SECONDS).until(() -> {
-      List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
-      return events.size() == 8 && events.get(events.size() - 
1).type().equals(SagaEndedEvent.name());
+      List<TxEventHistory> events = 
eventHistoryRepo.findByGlobalTxId(globalTxId);
+      return events.size() == 6 && events.get(events.size() - 1).type()
+          .equals(SagaEndedEvent.name());
     });
   }
 
@@ -393,9 +420,13 @@ public void abortTimeoutSagaStartedEvent() {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, 
globalTxId, null, 1));
 
-    await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3);
+    await().atMost(2, SECONDS).until(() ->
+    {
+      List<TxEventHistory> events = 
eventHistoryRepo.findByGlobalTxId(globalTxId);
+      return eventHistoryRepo.count() == 3;
+    });
 
-    List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+    List<TxEventHistory> events = 
eventHistoryRepo.findByGlobalTxId(globalTxId);
     assertThat(events.get(0).type(), is(SagaStartedEvent.name()));
     assertThat(events.get(1).type(), is(TxAbortedEvent.name()));
     assertThat(events.get(2).type(), is(SagaEndedEvent.name()));
@@ -418,21 +449,16 @@ public void abortTimeoutTxStartedEvent() {
     blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, 
globalTxId, 1));
 
     await().atMost(2, SECONDS).until(() -> {
-      List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
-      return eventRepo.count() == 5 && events.get(events.size() - 
1).type().equals(SagaEndedEvent.name());
+      List<TxEventHistory> events = 
eventHistoryRepo.findByGlobalTxId(globalTxId);
+      return eventHistoryRepo.count() == 4 && events.get(events.size() - 
1).type()
+          .equals(SagaEndedEvent.name());
     });
 
-    List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+    List<TxEventHistory> events = 
eventHistoryRepo.findByGlobalTxId(globalTxId);
     assertThat(events.get(0).type(), is(SagaStartedEvent.name()));
     assertThat(events.get(1).type(), is(TxStartedEvent.name()));
     assertThat(events.get(2).type(), is(TxAbortedEvent.name()));
-    // The SagaEndedEvent could be received before TxCompensatedEvent
-    if ("TxCompensatedEvent".equals(events.get(3).type())) {
-      assertThat(events.get(4).type(), is(SagaEndedEvent.name()));
-    } else {
-      assertThat(events.get(3).type(), is(SagaEndedEvent.name()));
-      assertThat(events.get(4).type(), is(TxCompensatedEvent.name()));
-    }
+    assertThat(events.get(3).type(), is(SagaEndedEvent.name()));
 
     await().atMost(2, SECONDS).until(this::waitTillTimeoutDone);
 
@@ -448,23 +474,54 @@ public void abortTimeoutTxStartedEvent() {
   @Test
   public void doNotCompensateRetryingEvents() throws InterruptedException {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
-    blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 
1));
-    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
-    blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 
0));
+    blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 
2));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
-    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4);
+    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 2);
 
     List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
-    assertThat(events.size(), is(4));
+    assertThat(events.size(), is(2));
     assertThat(events.get(0).type(), is(TxStartedEvent.name()));
-    assertThat(events.get(1).type(), is(TxAbortedEvent.name()));
-    assertThat(events.get(2).type(), is(TxStartedEvent.name()));
-    assertThat(events.get(3).type(), is(TxEndedEvent.name()));
+    assertThat(events.get(1).type(), is(TxEndedEvent.name()));
 
     assertThat(receivedCommands.isEmpty(), is(true));
   }
 
+  @Test
+  public void whenAbortEventIsLate() {
+
+    String sagaGlobalId = UUID.randomUUID().toString();
+    String localIdEntry = UUID.randomUUID().toString();
+    String localIdEndpoint = UUID.randomUUID().toString();
+
+    String anotherSagaGlobalId = UUID.randomUUID().toString();
+    String anotheEntryLocalId = UUID.randomUUID().toString();
+    String anotherEndPointLocalId = UUID.randomUUID().toString();
+
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, sagaGlobalId, 
localIdEntry));
+    blockingStub
+        .onTxEvent(someGrpcEvent(SagaStartedEvent, anotherSagaGlobalId, 
anotheEntryLocalId));
+
+    blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, sagaGlobalId, 
localIdEndpoint));
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, sagaGlobalId, 
localIdEndpoint));
+    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4);
+
+    blockingStub
+        .onTxEvent(someGrpcEvent(TxStartedEvent, anotherSagaGlobalId, 
anotherEndPointLocalId));
+    blockingStub
+        .onTxEvent(someGrpcEvent(TxEndedEvent, anotherSagaGlobalId, 
anotherEndPointLocalId));
+    blockingStub
+        .onTxEvent(someGrpcEvent(TxAbortedEvent, anotherSagaGlobalId, 
anotherEndPointLocalId));
+
+    await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty());
+
+    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, sagaGlobalId, 
anotheEntryLocalId));
+
+    await().atMost(10, SECONDS).until(() -> receivedCommands.size() > 1);
+    assertThat(receivedCommands.size(), is(2));
+  }
+
   private boolean waitTillTimeoutDone() {
     for (TxTimeout txTimeout : timeoutEntityRepository.findAll()) {
       if (txTimeout.status().equals(DONE.name())) {
@@ -502,13 +559,16 @@ private TxEvent someTxAbortEvent(String serviceName, 
String instanceId) {
         payload.getBytes());
   }
 
-  private GrpcTxEvent someGrpcEventWithTimeout(EventType type, String 
localTxId, String parentTxId, int timeout) {
-    return eventOf(type, globalTxId, localTxId, parentTxId, 
payload.getBytes(), getClass().getCanonicalName(), timeout,
+  private GrpcTxEvent someGrpcEventWithTimeout(EventType type, String 
localTxId, String parentTxId,
+      int timeout) {
+    return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(),
+        getClass().getCanonicalName(), timeout,
         "", 0);
   }
 
   private GrpcTxEvent someGrpcEventWithRetry(EventType type, String 
retryMethod, int retries) {
-    return eventOf(type, globalTxId, localTxId, parentTxId, 
payload.getBytes(), compensationMethod, 0,
+    return eventOf(type, globalTxId, localTxId, parentTxId, 
payload.getBytes(), compensationMethod,
+        0,
         retryMethod, retries);
   }
 
@@ -524,14 +584,18 @@ private GrpcTxEvent someGrpcEvent(EventType type, String 
globalTxId, String loca
     return someGrpcEvent(type, globalTxId, localTxId, parentTxId);
   }
 
-  private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String 
localTxId, String parentTxId) {
-    return eventOf(type, globalTxId, localTxId, parentTxId, 
payload.getBytes(), getClass().getCanonicalName(), 0, "",
+  private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String 
localTxId,
+      String parentTxId) {
+    return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(),
+        getClass().getCanonicalName(), 0, "",
         0);
   }
 
-  private GrpcTxEvent eventOf(EventType eventType, String localTxId, String 
parentTxId, byte[] payloads,
+  private GrpcTxEvent eventOf(EventType eventType, String localTxId, String 
parentTxId,
+      byte[] payloads,
       String compensationMethod) {
-    return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, 
compensationMethod, 0, "", 0);
+    return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, 
compensationMethod, 0,
+        "", 0);
   }
 
   private GrpcTxEvent eventOf(EventType eventType,
@@ -561,11 +625,13 @@ private GrpcTxEvent eventOf(EventType eventType,
   }
 
   private static class CompensationStreamObserver implements 
StreamObserver<GrpcCompensateCommand> {
+
     private final Consumer<GrpcCompensateCommand> consumer;
     private boolean completed = false;
 
     private CompensationStreamObserver() {
-      this(command -> {});
+      this(command -> {
+      });
     }
 
     private CompensationStreamObserver(Consumer<GrpcCompensateCommand> 
consumer) {
@@ -595,12 +661,6 @@ boolean isCompleted() {
 
   @PostConstruct
   void init() {
-    // simulates concurrent db connections
-    new EventScanner(
-        Executors.newSingleThreadScheduledExecutor(),
-        eventRepository,
-        commandRepository,
-        timeoutRepository,
-        omegaCallback, 1).run();
+
   }
 }
diff --git a/alpha/alpha-server/src/test/resources/schema.sql 
b/alpha/alpha-server/src/test/resources/schema.sql
index f003d453..d985786f 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -30,7 +30,21 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   retries int DEFAULT 0 NOT NULL,
   payloads blob
 );
-
+  CREATE TABLE IF NOT EXISTS TxEventHistory (
+    surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, 
INCREMENT BY 1) PRIMARY KEY,
+    serviceName varchar(36) NOT NULL,
+    instanceId varchar(36) NOT NULL,
+    creationTime TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
+    globalTxId varchar(36) NOT NULL,
+    localTxId varchar(36) NOT NULL,
+    parentTxId varchar(36) DEFAULT NULL,
+    type varchar(50) NOT NULL,
+    compensationMethod varchar(256) NOT NULL,
+    expiryTime TIMESTAMP NOT NULL,
+    retryMethod varchar(256) NOT NULL,
+    retries int DEFAULT 0 NOT NULL,
+    payloads blob
+  );
 CREATE TABLE IF NOT EXISTS Command (
   surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT 
BY 1) PRIMARY KEY,
   eventId bigint NOT NULL UNIQUE,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> saga alpha event scanner optimization
> -------------------------------------
>
>                 Key: SCB-915
>                 URL: https://issues.apache.org/jira/browse/SCB-915
>             Project: Apache ServiceComb
>          Issue Type: Improvement
>            Reporter: FuChenGeng
>            Assignee: FuChenGeng
>            Priority: Major
>
> 1.the compensate logic for retry scenarios is not perfect, in some place it 
> do not considering retry scenarios.
> 2.do one compensation in one event scanner cycle,it mean that if there are 
> 1000 aborted event, it will cost at lest 500s to compensate it. And it has 
> some bugs like
> [https://github.com/apache/incubator-servicecomb-saga/issues/253]
> 3.all hot and cold data are in the same table
> 4. omega do not send every try event to alpha.omega do it's try logic by 
> itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to