This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 91ddb85d06acde1ddd91cb50c4fa47e98b31bc2a
Author: cherrylzhao <[email protected]>
AuthorDate: Mon Aug 27 14:31:42 2018 +0800

    SCB-856 Refactor TCC alpha server with TccCallbackEngine.
---
 .../servicecomb/saga/alpha/server/AlphaConfig.java | 17 +++---
 .../{OmegaCallback.java => CallbackEngine.java}    |  9 ++--
 .../alpha/server/tcc/GrpcOmegaTccCallback.java     |  4 +-
 .../saga/alpha/server/tcc/GrpcTccEventService.java | 22 ++++----
 .../saga/alpha/server/tcc/OmegaCallback.java       |  3 +-
 ...megaCallback.java => OmegaCallbackWrapper.java} | 15 ++++--
 .../alpha/server/tcc/OmegaCallbacksRegistry.java   | 11 ++++
 .../saga/alpha/server/tcc/TccCallbackEngine.java   | 61 ++++++++++++++++++++++
 8 files changed, 108 insertions(+), 34 deletions(-)

diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 8d9af1e..91143f4 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -35,6 +35,8 @@ import 
org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
 import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
 import org.apache.servicecomb.saga.alpha.server.tcc.GrpcTccEventService;
+import org.apache.servicecomb.saga.alpha.server.tcc.OmegaCallbackWrapper;
+import org.apache.servicecomb.saga.alpha.server.tcc.TccCallbackEngine;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.domain.EntityScan;
 import org.springframework.context.annotation.Bean;
@@ -90,14 +92,11 @@ class AlphaConfig {
       CommandRepository commandRepository,
       TxTimeoutRepository timeoutRepository,
       OmegaCallback omegaCallback) {
-
-    new EventScanner(scheduler,
-        eventRepository, commandRepository, timeoutRepository,
-        omegaCallback, eventPollingInterval).run();
-
-    TxConsistentService consistentService = new 
TxConsistentService(eventRepository);
-
-    return consistentService;
+        new EventScanner(scheduler,
+            eventRepository, commandRepository, timeoutRepository,
+            omegaCallback, eventPollingInterval).run();
+        TxConsistentService consistentService = new 
TxConsistentService(eventRepository);
+        return consistentService;
   }
 
   @Bean
@@ -105,7 +104,7 @@ class AlphaConfig {
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
     ServerStartable bootstrap = new GrpcStartable(serverConfig,
         new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks),
-        new GrpcTccEventService());
+        new GrpcTccEventService(new TccCallbackEngine(new 
OmegaCallbackWrapper())));
     new Thread(bootstrap::start).start();
     return bootstrap;
   }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/CallbackEngine.java
similarity index 81%
copy from 
alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
copy to 
alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/CallbackEngine.java
index 369472c..e69eb6a 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/CallbackEngine.java
@@ -17,12 +17,9 @@
 
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
-import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
 
-public interface OmegaCallback {
+public interface CallbackEngine {
 
-  void invoke(ParticipatedEvent event, String status);
-
-  default void disconnect() {
-  }
+   boolean execute(GrpcTccTransactionEndedEvent request);
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
index 8ea7cfb..e1fb68a 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
@@ -34,13 +34,13 @@ public final class GrpcOmegaTccCallback implements 
OmegaCallback {
   }
 
   @Override
-  public void invoke(ParticipatedEvent event, String status) {
+  public void invoke(ParticipatedEvent event, TransactionStatus status) {
     GrpcTccCoordinateCommand command = GrpcTccCoordinateCommand.newBuilder()
         .setGlobalTxId(event.getGlobalTxId())
         .setLocalTxId(event.getLocalTxId())
         .setParentTxId(event.getParentTxId() == null ? "" : 
event.getParentTxId())
         .setServiceName(event.getServiceName())
-        .setMethod("Succeed".equals(status) ? event.getConfirmMethod() : 
event.getCancelMethod())
+        .setMethod(TransactionStatus.Succeed.equals(status) ? 
event.getConfirmMethod() : event.getCancelMethod())
         .build();
     responseObserver.onNext(command);
   }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
index ef9adbe..aa20f77 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
@@ -18,9 +18,7 @@
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import io.grpc.stub.StreamObserver;
-import org.apache.servicecomb.saga.alpha.core.AlphaException;
 import 
org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipateEventFactory;
-import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
@@ -35,8 +33,16 @@ import 
org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
 public class GrpcTccEventService extends 
TccEventServiceGrpc.TccEventServiceImplBase {
 
   private static final GrpcAck ALLOW = 
GrpcAck.newBuilder().setAborted(false).build();
+  
   private static final GrpcAck REJECT = 
GrpcAck.newBuilder().setAborted(true).build();
 
+  private final TccCallbackEngine tccCallbackEngine;
+
+  public GrpcTccEventService(
+      TccCallbackEngine tccCallbackEngine) {
+    this.tccCallbackEngine = tccCallbackEngine;
+  }
+
   @Override
   public void onConnected(GrpcServiceConfig request, 
StreamObserver<GrpcTccCoordinateCommand> responseObserver) {
     OmegaCallbacksRegistry.register(request, responseObserver);
@@ -57,18 +63,10 @@ public class GrpcTccEventService extends 
TccEventServiceGrpc.TccEventServiceImpl
 
   @Override
   public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request, 
StreamObserver<GrpcAck> responseObserver) {
-    try {
-      for (ParticipatedEvent event : 
TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
-        OmegaCallbacksRegistry.retrieve(event.getServiceName(), 
event.getInstanceId())
-            .invoke(event, request.getStatus());
-      }
-    } catch (AlphaException ex) {
-      responseObserver.onNext(REJECT);
-    }
-    responseObserver.onNext(ALLOW);
+    responseObserver.onNext(tccCallbackEngine.execute(request) ? ALLOW : 
REJECT);
     responseObserver.onCompleted();
   }
-
+  
   @Override
   public void onDisconnected(GrpcServiceConfig request, 
StreamObserver<GrpcAck> responseObserver) {
     OmegaCallback omegaCallback = 
OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(), 
request.getInstanceId());
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
index 369472c..41a6c06 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
@@ -18,10 +18,11 @@
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
 
 public interface OmegaCallback {
 
-  void invoke(ParticipatedEvent event, String status);
+  void invoke(ParticipatedEvent event, TransactionStatus status);
 
   default void disconnect() {
   }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbackWrapper.java
similarity index 65%
copy from 
alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
copy to 
alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbackWrapper.java
index 369472c..3b55ffa 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbackWrapper.java
@@ -18,11 +18,18 @@
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
 
-public interface OmegaCallback {
+public class OmegaCallbackWrapper implements OmegaCallback {
 
-  void invoke(ParticipatedEvent event, String status);
-
-  default void disconnect() {
+  @Override
+  public void invoke(ParticipatedEvent event, TransactionStatus status) {
+    OmegaCallback omegaCallback = 
OmegaCallbacksRegistry.retrieve(event.getServiceName(), event.getInstanceId());
+    try {
+      omegaCallback.invoke(event, status);
+    } catch (Exception ex) {
+      OmegaCallbacksRegistry.removeByValue(event.getServiceName(), 
omegaCallback);
+      throw ex;
+    }
   }
 }
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
index 834a5a2..9e228da 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
@@ -20,17 +20,22 @@ package org.apache.servicecomb.saga.alpha.server.tcc;
 import static java.util.Collections.emptyMap;
 
 import io.grpc.stub.StreamObserver;
+import java.lang.invoke.MethodHandles;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.servicecomb.saga.alpha.core.AlphaException;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Manage omega callbacks.
  */
 public final class OmegaCallbacksRegistry {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private final static Map<String, Map<String, OmegaCallback>> REGISTRY = new 
ConcurrentHashMap<>();
 
   public static Map<String, Map<String, OmegaCallback>> getRegistry() {
@@ -64,11 +69,17 @@ public final class OmegaCallbacksRegistry {
     }
     OmegaCallback result = callbackMap.get(instanceId);
     if (null == result) {
+      LOG.info("Cannot find the service with the instanceId {}, call the other 
instance.", instanceId);
       return callbackMap.values().iterator().next();
     }
     return result;
   }
 
+  public static void removeByValue(String serviceName, OmegaCallback 
omegaCallback) {
+    Map<String, OmegaCallback> callbackMap = 
REGISTRY.getOrDefault(serviceName, emptyMap());
+    callbackMap.values().remove(omegaCallback);
+  }
+
   /**
    * Retrieve omega TCC callback by service name and instance id, then remove 
it from registry.
    *
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngine.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngine.java
new file mode 100644
index 0000000..f9aae88
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngine.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc;
+
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TccCallbackEngine implements CallbackEngine {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OmegaCallbackWrapper omegaCallbackWrapper;
+
+  public TccCallbackEngine(OmegaCallbackWrapper omegaCallbackWrapper) {
+    this.omegaCallbackWrapper = omegaCallbackWrapper;
+  }
+
+  @Override
+  public boolean execute(GrpcTccTransactionEndedEvent request) {
+    boolean result = true;
+    for (ParticipatedEvent event : 
TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
+      try {
+        omegaCallbackWrapper.invoke(event, 
TransactionStatus.valueOf(request.getStatus()));
+      } catch (Exception ex) {
+        logError(event, ex);
+        result = false;
+      }
+    }
+    return result;
+  }
+
+  private void logError(ParticipatedEvent event, Exception ex) {
+    LOG.error(
+        "Failed to invoke service [{}] instance [{}] with method [{}], global 
tx id [{}] and local tx id [{}]",
+        event.getServiceName(),
+        event.getInstanceId(),
+        TransactionStatus.Succeed.equals(event.getStatus()) ? 
event.getConfirmMethod() : event.getCancelMethod(),
+        event.getGlobalTxId(),
+        event.getLocalTxId(),
+        ex);
+  }
+}

Reply via email to