This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 91ddb85d06acde1ddd91cb50c4fa47e98b31bc2a Author: cherrylzhao <[email protected]> AuthorDate: Mon Aug 27 14:31:42 2018 +0800 SCB-856 Refactor TCC alpha server with TccCallbackEngine. --- .../servicecomb/saga/alpha/server/AlphaConfig.java | 17 +++--- .../{OmegaCallback.java => CallbackEngine.java} | 9 ++-- .../alpha/server/tcc/GrpcOmegaTccCallback.java | 4 +- .../saga/alpha/server/tcc/GrpcTccEventService.java | 22 ++++---- .../saga/alpha/server/tcc/OmegaCallback.java | 3 +- ...megaCallback.java => OmegaCallbackWrapper.java} | 15 ++++-- .../alpha/server/tcc/OmegaCallbacksRegistry.java | 11 ++++ .../saga/alpha/server/tcc/TccCallbackEngine.java | 61 ++++++++++++++++++++++ 8 files changed, 108 insertions(+), 34 deletions(-) diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java index 8d9af1e..91143f4 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java @@ -35,6 +35,8 @@ import org.apache.servicecomb.saga.alpha.core.TxConsistentService; import org.apache.servicecomb.saga.alpha.core.TxEventRepository; import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository; import org.apache.servicecomb.saga.alpha.server.tcc.GrpcTccEventService; +import org.apache.servicecomb.saga.alpha.server.tcc.OmegaCallbackWrapper; +import org.apache.servicecomb.saga.alpha.server.tcc.TccCallbackEngine; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.domain.EntityScan; import org.springframework.context.annotation.Bean; @@ -90,14 +92,11 @@ class AlphaConfig { CommandRepository commandRepository, TxTimeoutRepository timeoutRepository, OmegaCallback omegaCallback) { - - new EventScanner(scheduler, - eventRepository, commandRepository, timeoutRepository, - omegaCallback, eventPollingInterval).run(); - - TxConsistentService consistentService = new TxConsistentService(eventRepository); - - return consistentService; + new EventScanner(scheduler, + eventRepository, commandRepository, timeoutRepository, + omegaCallback, eventPollingInterval).run(); + TxConsistentService consistentService = new TxConsistentService(eventRepository); + return consistentService; } @Bean @@ -105,7 +104,7 @@ class AlphaConfig { Map<String, Map<String, OmegaCallback>> omegaCallbacks) { ServerStartable bootstrap = new GrpcStartable(serverConfig, new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks), - new GrpcTccEventService()); + new GrpcTccEventService(new TccCallbackEngine(new OmegaCallbackWrapper()))); new Thread(bootstrap::start).start(); return bootstrap; } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/CallbackEngine.java similarity index 81% copy from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java copy to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/CallbackEngine.java index 369472c..e69eb6a 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/CallbackEngine.java @@ -17,12 +17,9 @@ package org.apache.servicecomb.saga.alpha.server.tcc; -import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent; -public interface OmegaCallback { +public interface CallbackEngine { - void invoke(ParticipatedEvent event, String status); - - default void disconnect() { - } + boolean execute(GrpcTccTransactionEndedEvent request); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java index 8ea7cfb..e1fb68a 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java @@ -34,13 +34,13 @@ public final class GrpcOmegaTccCallback implements OmegaCallback { } @Override - public void invoke(ParticipatedEvent event, String status) { + public void invoke(ParticipatedEvent event, TransactionStatus status) { GrpcTccCoordinateCommand command = GrpcTccCoordinateCommand.newBuilder() .setGlobalTxId(event.getGlobalTxId()) .setLocalTxId(event.getLocalTxId()) .setParentTxId(event.getParentTxId() == null ? "" : event.getParentTxId()) .setServiceName(event.getServiceName()) - .setMethod("Succeed".equals(status) ? event.getConfirmMethod() : event.getCancelMethod()) + .setMethod(TransactionStatus.Succeed.equals(status) ? event.getConfirmMethod() : event.getCancelMethod()) .build(); responseObserver.onNext(command); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java index ef9adbe..aa20f77 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java @@ -18,9 +18,7 @@ package org.apache.servicecomb.saga.alpha.server.tcc; import io.grpc.stub.StreamObserver; -import org.apache.servicecomb.saga.alpha.core.AlphaException; import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipateEventFactory; -import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand; @@ -35,8 +33,16 @@ import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc; public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImplBase { private static final GrpcAck ALLOW = GrpcAck.newBuilder().setAborted(false).build(); + private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build(); + private final TccCallbackEngine tccCallbackEngine; + + public GrpcTccEventService( + TccCallbackEngine tccCallbackEngine) { + this.tccCallbackEngine = tccCallbackEngine; + } + @Override public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcTccCoordinateCommand> responseObserver) { OmegaCallbacksRegistry.register(request, responseObserver); @@ -57,18 +63,10 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl @Override public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request, StreamObserver<GrpcAck> responseObserver) { - try { - for (ParticipatedEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) { - OmegaCallbacksRegistry.retrieve(event.getServiceName(), event.getInstanceId()) - .invoke(event, request.getStatus()); - } - } catch (AlphaException ex) { - responseObserver.onNext(REJECT); - } - responseObserver.onNext(ALLOW); + responseObserver.onNext(tccCallbackEngine.execute(request) ? ALLOW : REJECT); responseObserver.onCompleted(); } - + @Override public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) { OmegaCallback omegaCallback = OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(), request.getInstanceId()); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java index 369472c..41a6c06 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java @@ -18,10 +18,11 @@ package org.apache.servicecomb.saga.alpha.server.tcc; import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent; +import org.apache.servicecomb.saga.common.TransactionStatus; public interface OmegaCallback { - void invoke(ParticipatedEvent event, String status); + void invoke(ParticipatedEvent event, TransactionStatus status); default void disconnect() { } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbackWrapper.java similarity index 65% copy from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java copy to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbackWrapper.java index 369472c..3b55ffa 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbackWrapper.java @@ -18,11 +18,18 @@ package org.apache.servicecomb.saga.alpha.server.tcc; import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent; +import org.apache.servicecomb.saga.common.TransactionStatus; -public interface OmegaCallback { +public class OmegaCallbackWrapper implements OmegaCallback { - void invoke(ParticipatedEvent event, String status); - - default void disconnect() { + @Override + public void invoke(ParticipatedEvent event, TransactionStatus status) { + OmegaCallback omegaCallback = OmegaCallbacksRegistry.retrieve(event.getServiceName(), event.getInstanceId()); + try { + omegaCallback.invoke(event, status); + } catch (Exception ex) { + OmegaCallbacksRegistry.removeByValue(event.getServiceName(), omegaCallback); + throw ex; + } } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java index 834a5a2..9e228da 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java @@ -20,17 +20,22 @@ package org.apache.servicecomb.saga.alpha.server.tcc; import static java.util.Collections.emptyMap; import io.grpc.stub.StreamObserver; +import java.lang.invoke.MethodHandles; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.servicecomb.saga.alpha.core.AlphaException; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Manage omega callbacks. */ public final class OmegaCallbacksRegistry { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final static Map<String, Map<String, OmegaCallback>> REGISTRY = new ConcurrentHashMap<>(); public static Map<String, Map<String, OmegaCallback>> getRegistry() { @@ -64,11 +69,17 @@ public final class OmegaCallbacksRegistry { } OmegaCallback result = callbackMap.get(instanceId); if (null == result) { + LOG.info("Cannot find the service with the instanceId {}, call the other instance.", instanceId); return callbackMap.values().iterator().next(); } return result; } + public static void removeByValue(String serviceName, OmegaCallback omegaCallback) { + Map<String, OmegaCallback> callbackMap = REGISTRY.getOrDefault(serviceName, emptyMap()); + callbackMap.values().remove(omegaCallback); + } + /** * Retrieve omega TCC callback by service name and instance id, then remove it from registry. * diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngine.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngine.java new file mode 100644 index 0000000..f9aae88 --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngine.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.server.tcc; + +import java.lang.invoke.MethodHandles; +import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent; +import org.apache.servicecomb.saga.common.TransactionStatus; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TccCallbackEngine implements CallbackEngine { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final OmegaCallbackWrapper omegaCallbackWrapper; + + public TccCallbackEngine(OmegaCallbackWrapper omegaCallbackWrapper) { + this.omegaCallbackWrapper = omegaCallbackWrapper; + } + + @Override + public boolean execute(GrpcTccTransactionEndedEvent request) { + boolean result = true; + for (ParticipatedEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) { + try { + omegaCallbackWrapper.invoke(event, TransactionStatus.valueOf(request.getStatus())); + } catch (Exception ex) { + logError(event, ex); + result = false; + } + } + return result; + } + + private void logError(ParticipatedEvent event, Exception ex) { + LOG.error( + "Failed to invoke service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]", + event.getServiceName(), + event.getInstanceId(), + TransactionStatus.Succeed.equals(event.getStatus()) ? event.getConfirmMethod() : event.getCancelMethod(), + event.getGlobalTxId(), + event.getLocalTxId(), + ex); + } +}
