This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 9d2a0b84166403690685e17f9f3ba176204fdfbe
Author: zhang2014 <cos...@gmail.com>
AuthorDate: Tue Jan 23 23:42:35 2018 +0800

    SCB-224 alpha:support retries use command
---
 .../servicecomb/saga/alpha/core/Command.java       |  4 +-
 .../saga/alpha/core/CommandRepository.java         |  2 +-
 .../servicecomb/saga/alpha/core/OmegaCallback.java |  3 +-
 .../saga/alpha/core/TxConsistentService.java       | 54 ++----------
 .../saga/alpha/server/SpringCommandRepository.java | 14 ++++
 .../saga/alpha/server/TxEventEnvelope.java         | 96 ----------------------
 .../alpha/server/TxEventEnvelopeRepository.java    |  1 +
 .../src/main/resources/schema-postgresql.sql       |  2 +
 .../saga/alpha/server/AlphaIntegrationTest.java    | 49 +++++++++++
 alpha/alpha-server/src/test/resources/schema.sql   |  2 +
 .../saga/omega/context/CompensationContext.java    | 15 ++--
 .../saga/omega/context/OmegaContext.java           |  2 -
 .../omega/transaction/CompensableInterceptor.java  |  2 -
 13 files changed, 83 insertions(+), 163 deletions(-)

diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 0f016d3..4edb928 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -115,11 +115,11 @@ public class Command {
     return localTxId;
   }
 
-  String parentTxId() {
+  public String parentTxId() {
     return parentTxId;
   }
 
-  String compensationMethod() {
+  public String compensationMethod() {
     return compensationMethod;
   }
 
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 2bbea77..25288ed 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 public interface CommandRepository {
 
-  void saveCompensationCommands(String globalTxId);
+  void saveCompensationCommands(TxEvent abortEvent);
 
   void markCommandAsDone(String globalTxId, String localTxId);
 
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
index c7dfbbb..f60a44d 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
@@ -20,5 +20,6 @@ package org.apache.servicecomb.saga.alpha.core;
 public interface OmegaCallback {
   void compensate(TxEvent event);
 
-  default void disconnect() {}
+  default void disconnect() {
+  }
 }
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index b519c1b..968e5b7 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,17 +17,17 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 
-import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TxConsistentService {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -61,50 +61,6 @@ public class TxConsistentService {
     return true;
   }
 
-//  private void compensate(TxEvent event) {
-//    List<TxEvent> events = 
eventRepository.findTransactionsToCompensate(event.globalTxId());
-//
-//    Optional<TxEvent> startedEvent = events.stream().filter(e -> 
e.containChildren(event)).findFirst();
-//
-//    startedEvent.ifPresent(compensateEvent -> {
-//      Integer[] retiesAndTimes = eventsToRetries.compute(event.parentTxId(), 
(k, v) ->
-//          v == null ? new Integer[] {compensateEvent.retries(), 0} : new 
Integer[] {v[0], v[1] + 1});
-//      List<TxEvent> compensationEvents =
-//          retiesAndTimes[0] >= retiesAndTimes[1] ? events : 
Collections.singletonList(
-//              new TxEvent(
-//                  event.serviceName(), event.instanceId(), 
event.creationTime(), event.globalTxId(),
-//                  event.localTxId(), event.parentTxId(), event.type(), 
event.retriesMethod(), event.payloads()
-//              ));
-//
-//      compensateImpl(event.globalTxId(), compensationEvents);
-//    });
-//  }
-//
-//  private void compensateImpl(String globalTxId, List<TxEvent> events) {
-//    events.removeIf(this::isCompensationScheduled);
-//
-//    Set<String> localTxIds = eventsToCompensate.computeIfAbsent(globalTxId, 
k -> new HashSet<>());
-//    events.forEach(e -> localTxIds.add(e.localTxId()));
-//
-//    events.forEach(omegaCallback::compensate);
-//  }
-
-  // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some 
omegas may have slow network and its TxEndedEvent reached late,
-  // unless we ask user to specify a name for each participant in the global 
TX in @Compensable
-//  private void updateCompensateStatus(TxEvent event) {
-//    Set<String> events = eventsToCompensate.get(event.globalTxId());
-//    if (events != null) {
-//      events.remove(event.localTxId());
-//      if (events.isEmpty()) {
-//        eventsToCompensate.remove(event.globalTxId());
-//        Integer[] retiesAndTimes = eventsToRetries.get(event.parentTxId());
-//        if (retiesAndTimes == null || retiesAndTimes[0] >= 
retiesAndTimes[1]) {
-//          markGlobalTxEnd(event);
-//          eventsToRetries.remove(event.parentTxId());
-//        }
-//      }
-//    }
-//  }
   private boolean isGlobalTxAborted(TxEvent event) {
     return !eventRepository.findTransactions(event.globalTxId(), 
TxAbortedEvent.name()).isEmpty();
   }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 086f88e..ed7b4f1 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -91,4 +91,18 @@ public class SpringCommandRepository implements 
CommandRepository {
 
     return commands;
   }
+
+  private long retriedTimes(String globalTxId, String retriesMethod, String 
localTxId) {
+    return commandRepository.findByGlobalTxIdAndStatus(globalTxId, 
DONE.name()).stream()
+        .filter(c -> Objects.equals(c.compensationMethod(), retriesMethod)
+            && Objects.equals(c.localTxId(), localTxId)).count();
+  }
+
+  private List<TxEvent> createRetriesTxEvent(long abortEventId, TxEvent 
txEvent) {
+    return Collections.singletonList(new TxEvent(
+        abortEventId, txEvent.serviceName(), txEvent.instanceId(), 
txEvent.creationTime(),
+        txEvent.globalTxId(), txEvent.localTxId(), txEvent.parentTxId(),
+        txEvent.type(), txEvent.retriesMethod(), txEvent.payloads()
+    ));
+  }
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelope.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelope.java
deleted file mode 100644
index 7d93462..0000000
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelope.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.alpha.server;
-
-import java.util.Date;
-
-import javax.persistence.Embedded;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.Id;
-
-import org.apache.servicecomb.saga.alpha.core.TxEvent;
-
-@Entity class TxEventEnvelope {
-  @Id
-  @GeneratedValue
-  private long surrogateId;
-
-  @Embedded
-  private TxEvent event;
-
-  private TxEventEnvelope() {
-  }
-
-  TxEventEnvelope(TxEvent event) {
-    this.event = event;
-  }
-
-  public TxEventEnvelope(
-      String serviceName, String instanceId, String globalTxId,
-      String localTxId, String parentTxId, String type, String 
compensationMethod,
-      String retriesMethod, int retries, byte[] payloads) {
-    this.event = new TxEvent(
-        serviceName, instanceId, new Date(), globalTxId, localTxId, 
parentTxId, type,
-        compensationMethod, retriesMethod, retries, payloads);
-  }
-
-  String serviceName() {
-    return event.serviceName();
-  }
-
-  String instanceId() {
-    return event.instanceId();
-  }
-
-  public long creationTime() {
-    return event.creationTime().getTime();
-  }
-
-  String globalTxId() {
-    return event.globalTxId();
-  }
-
-  String localTxId() {
-    return event.localTxId();
-  }
-
-  String parentTxId() {
-    return event.parentTxId();
-  }
-
-  String type() {
-    return event.type();
-  }
-
-  String compensationMethod() {
-    return event.compensationMethod();
-  }
-
-  byte[] payloads() {
-    return event.payloads();
-  }
-
-  int retries() {
-    return event.retries();
-  }
-
-  TxEvent event() {
-    return event;
-  }
-}
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 470caa5..3a7edb3 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -73,6 +73,7 @@ interface TxEventEnvelopeRepository extends 
CrudRepository<TxEvent, Long> {
       + "  FROM TxEvent t2 "
       + "  WHERE t2.globalTxId = ?1 "
       + "  AND t2.localTxId = t.localTxId "
+      + "  AND t2.compensationMethod != t.retriesMethod "
       + "  AND t2.type = 'TxCompensatedEvent') "
       + "ORDER BY t.surrogateId ASC")
   List<TxEvent> 
findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql 
b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index 41815ee..674e051 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -26,6 +26,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
   expiryTime timestamp(6) NOT NULL,
+  retriesMethod varchar(256) NOT NULL,
+  retries int NOT NULL,
   payloads bytea
 );
 
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 8b2672c..26aa17a 100644
--- 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -438,6 +438,55 @@ public class AlphaIntegrationTest {
     return false;
   }
 
+  @Test
+  public void retiesAndCompensateOnFailure() throws Exception {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+
+    String localTxId1 = UUID.randomUUID().toString();
+
+    blockingStub.onTxEvent(GrpcTxEvent.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .setTimestamp(System.currentTimeMillis())
+        .setGlobalTxId(globalTxId)
+        .setLocalTxId(localTxId1)
+        .setParentTxId(parentTxId)
+        .setType(TxStartedEvent.name())
+        .setCompensationMethod("Compensation Method")
+        .setPayloads(ByteString.copyFrom(payload.getBytes()))
+        .setRetries(3).setRetriesMethod("Retries Method")
+        .build());
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, 
localTxId1));
+
+    await().atMost(3, SECONDS).until(() -> 
!eventRepo.findByGlobalTxId(globalTxId).isEmpty());
+
+    for (int i = 0; i < 3; i++) {
+      blockingStub.onTxEvent(
+          eventOf(TxAbortedEvent, localTxId, localTxId1, payload.getBytes(), 
getClass().getCanonicalName()));
+
+      await().atMost(3, SECONDS).until(() -> receivedCommands.size() == 1);
+
+      GrpcCompensateCommand command = receivedCommands.poll();
+      assertThat(command.getGlobalTxId(), is(globalTxId));
+      assertThat(command.getLocalTxId(), is(localTxId1));
+      assertThat(command.getParentTxId(), is(parentTxId));
+      assertThat(command.getCompensateMethod(), is("Retries Method"));
+      assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
+    }
+
+    blockingStub.onTxEvent(
+        eventOf(TxAbortedEvent, localTxId, localTxId1, payload.getBytes(), 
getClass().getCanonicalName()));
+
+    await().atMost(3, SECONDS).until(() -> receivedCommands.size() == 1);
+
+    GrpcCompensateCommand command = receivedCommands.poll();
+    assertThat(command.getGlobalTxId(), is(globalTxId));
+    assertThat(command.getLocalTxId(), is(localTxId1));
+    assertThat(command.getParentTxId(), is(parentTxId));
+    assertThat(command.getCompensateMethod(), is("Compensation Method"));
+    assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
+  }
+
   private GrpcAck onCompensation(GrpcCompensateCommand command) {
     return blockingStub.onTxEvent(
         eventOf(TxCompensatedEvent,
diff --git a/alpha/alpha-server/src/test/resources/schema.sql 
b/alpha/alpha-server/src/test/resources/schema.sql
index a10a4e0..ca46625 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -26,6 +26,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
   expiryTime TIMESTAMP NOT NULL,
+  retriesMethod varchar(256) NOT NULL,
+  retries int NOT NULL,
   payloads varbinary(10240)
 );
 
diff --git 
a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
 
b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
index 8d9eb7e..48a67f7 100644
--- 
a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
+++ 
b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
@@ -29,19 +29,15 @@ import org.slf4j.LoggerFactory;
 public class CompensationContext {
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final Map<String, TransactionContextInternal> contexts = new 
ConcurrentHashMap<>();
-
-  public CompensationContext() {
-  }
+  private final Map<String, CompensationContextInternal> contexts = new 
ConcurrentHashMap<>();
 
   public void addCompensationContext(Method compensationMethod, Object target) 
{
     compensationMethod.setAccessible(true);
-    contexts.put(compensationMethod.toString(),
-        new TransactionContextInternal(target, compensationMethod));
+    contexts.put(compensationMethod.toString(), new 
CompensationContextInternal(target, compensationMethod));
   }
 
   public void compensate(String globalTxId, String localTxId, String 
compensationMethod, Object... payloads) {
-    TransactionContextInternal contextInternal = 
contexts.get(compensationMethod);
+    CompensationContextInternal contextInternal = 
contexts.get(compensationMethod);
 
     try {
       contextInternal.compensationMethod.invoke(contextInternal.target, 
payloads);
@@ -54,12 +50,11 @@ public class CompensationContext {
     }
   }
 
-  private static final class TransactionContextInternal {
+  private static final class CompensationContextInternal {
     private final Object target;
-
     private final Method compensationMethod;
 
-    private TransactionContextInternal(Object target, Method 
compensationMethod) {
+    private CompensationContextInternal(Object target, Method 
compensationMethod) {
       this.target = target;
       this.compensationMethod = compensationMethod;
     }
diff --git 
a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
 
b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
index ac9f02c..daa8e7c 100644
--- 
a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
+++ 
b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
@@ -17,8 +17,6 @@
 
 package org.apache.servicecomb.saga.omega.context;
 
-import java.util.Map;
-
 public class OmegaContext {
   public static final String GLOBAL_TX_ID_KEY = "X-Pack-Global-Transaction-Id";
   public static final String LOCAL_TX_ID_KEY = "X-Pack-Local-Transaction-Id";
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index d3e2bd1..988d8d7 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -21,9 +21,7 @@ import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 
 class CompensableInterceptor implements EventAwareInterceptor {
-
   private final OmegaContext context;
-
   private final MessageSender sender;
 
   CompensableInterceptor(OmegaContext context, MessageSender sender) {

-- 
To stop receiving notification emails like this one, please contact
ningji...@apache.org.

Reply via email to