This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit bd5ed773cf5fcb63c2b52f59de2189c2bdb01424
Author: zhang2014 <cos...@gmail.com>
AuthorDate: Wed Jan 17 23:01:55 2018 +0800

    SCB-224: alpha support retries
---
 .../saga/alpha/core/CompositeOmegaCallback.java    | 21 +++++++++
 .../servicecomb/saga/alpha/core/OmegaCallback.java |  2 +
 .../saga/alpha/core/PushBackOmegaCallback.java     | 10 ++++
 .../saga/alpha/core/TxConsistentService.java       | 55 ++++++++++++++++++++--
 .../servicecomb/saga/alpha/core/TxEvent.java       | 13 +++++
 .../saga/alpha/server/GrpcOmegaCallback.java       | 12 +++++
 .../src/main/proto/GrpcTxEvent.proto               |  2 +
 7 files changed, 110 insertions(+), 5 deletions(-)

diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
index 32e5102..bce8f4b 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
@@ -29,6 +29,27 @@ public class CompositeOmegaCallback implements OmegaCallback 
{
   }
 
   @Override
+  public void retries(TxEvent event) {
+    Map<String, OmegaCallback> serviceCallbacks = 
callbacks.getOrDefault(event.serviceName(), emptyMap());
+
+    if (serviceCallbacks.isEmpty()) {
+      throw new AlphaException("No such omega callback found for service " + 
event.serviceName());
+    }
+
+    OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
+    if (omegaCallback == null) {
+      omegaCallback = serviceCallbacks.values().iterator().next();
+    }
+
+    try {
+      omegaCallback.compensate(event);
+    } catch (Exception e) {
+      serviceCallbacks.values().remove(omegaCallback);
+      throw e;
+    }
+  }
+
+  @Override
   public void compensate(TxEvent event) {
     Map<String, OmegaCallback> serviceCallbacks = 
callbacks.getOrDefault(event.serviceName(), emptyMap());
 
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
index f60a44d..d926ed0 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
@@ -18,6 +18,8 @@
 package org.apache.servicecomb.saga.alpha.core;
 
 public interface OmegaCallback {
+  void retries(TxEvent event);
+
   void compensate(TxEvent event);
 
   default void disconnect() {
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
index 3b27c14..290eb20 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
@@ -35,6 +35,16 @@ public class PushBackOmegaCallback implements OmegaCallback {
   }
 
   @Override
+  public void retries(TxEvent event) {
+    try {
+      underlying.compensate(event);
+    } catch (Exception e) {
+      logError(event, e);
+      pendingCompensations.offer(() -> compensate(event));
+    }
+  }
+
+  @Override
   public void compensate(TxEvent event) {
     try {
       underlying.compensate(event);
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 968e5b7..a9a3ed2 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,17 +17,18 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
+
 
 public class TxConsistentService {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -61,6 +62,50 @@ public class TxConsistentService {
     return true;
   }
 
+//  private void compensate(TxEvent event) {
+//    List<TxEvent> events = 
eventRepository.findTransactionsToCompensate(event.globalTxId());
+//
+//    Optional<TxEvent> startedEvent = events.stream().filter(e -> 
e.containChildren(event)).findFirst();
+//
+//    startedEvent.ifPresent(compensateEvent -> {
+//      Integer[] retiesAndTimes = eventsToRetries.compute(event.parentTxId(), 
(k, v) ->
+//          v == null ? new Integer[] {compensateEvent.retries(), 0} : new 
Integer[] {v[0], v[1] + 1});
+//      List<TxEvent> compensationEvents =
+//          retiesAndTimes[0] >= retiesAndTimes[1] ? events : 
Collections.singletonList(
+//              new TxEvent(
+//                  event.serviceName(), event.instanceId(), 
event.creationTime(), event.globalTxId(),
+//                  event.localTxId(), event.parentTxId(), event.type(), 
event.retriesMethod(), event.payloads()
+//              ));
+//
+//      compensateImpl(event.globalTxId(), compensationEvents);
+//    });
+//  }
+//
+//  private void compensateImpl(String globalTxId, List<TxEvent> events) {
+//    events.removeIf(this::isCompensationScheduled);
+//
+//    Set<String> localTxIds = eventsToCompensate.computeIfAbsent(globalTxId, 
k -> new HashSet<>());
+//    events.forEach(e -> localTxIds.add(e.localTxId()));
+//
+//    events.forEach(omegaCallback::compensate);
+//  }
+
+  // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some 
omegas may have slow network and its TxEndedEvent reached late,
+  // unless we ask user to specify a name for each participant in the global 
TX in @Compensable
+//  private void updateCompensateStatus(TxEvent event) {
+//    Set<String> events = eventsToCompensate.get(event.globalTxId());
+//    if (events != null) {
+//      events.remove(event.localTxId());
+//      if (events.isEmpty()) {
+//        eventsToCompensate.remove(event.globalTxId());
+//        Integer[] retiesAndTimes = eventsToRetries.get(event.parentTxId());
+//        if (retiesAndTimes == null || retiesAndTimes[0] >= 
retiesAndTimes[1]) {
+//          markGlobalTxEnd(event);
+//          eventsToRetries.remove(event.parentTxId());
+//        }
+//      }
+//    }
+//  }
   private boolean isGlobalTxAborted(TxEvent event) {
     return !eventRepository.findTransactions(event.globalTxId(), 
TxAbortedEvent.name()).isEmpty();
   }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 76dfca7..5176ce0 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -48,6 +48,8 @@ public class TxEvent {
   private String compensationMethod;
   private Date expiryTime;
   private byte[] payloads;
+  private int retries;
+  private String retriesMethod;
 
   private TxEvent() {
   }
@@ -207,4 +209,15 @@ public class TxEvent {
         ", expiryTime='" + expiryTime + '\'' +
         '}';
   }
+  public int retries() {
+    return retries;
+  }
+
+  public String retriesMethod() {
+    return retriesMethod;
+  }
+
+  public boolean containChildren(TxEvent event) {
+    return this.localTxId.equals(event.parentTxId);
+  }
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index 5a95281..83a6b9d 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -37,6 +37,18 @@ class GrpcOmegaCallback implements OmegaCallback {
   }
 
   @Override
+  public void retries(TxEvent event) {
+    GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
+        .setGlobalTxId(event.globalTxId())
+        .setLocalTxId(event.localTxId())
+        .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
+        .setCompensateMethod(event.retriesMethod())
+        .setPayloads(ByteString.copyFrom(event.payloads()))
+        .build();
+    observer.onNext(command);
+  }
+
+  @Override
   public void compensate(TxEvent event) {
     GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
         .setGlobalTxId(event.globalTxId())
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto 
b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 3944eee..9605a37 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -47,6 +47,8 @@ message GrpcTxEvent {
   string serviceName = 8;
   string instanceId = 9;
   int32 timeout = 10;
+  int32 retries = 11;
+  string retriesMethod = 12;
 }
 
 message GrpcCompensateCommand {

-- 
To stop receiving notification emails like this one, please contact
ningji...@apache.org.

Reply via email to