This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 68382f281cbb7c9b814e9ceb5ed75db4614a7c84 Author: Willem Jiang <[email protected]> AuthorDate: Tue Aug 28 09:11:41 2018 +0800 SCB-818 Implements the TccEventServie of grpc --- omega/omega-connector/omega-connector-grpc/pom.xml | 6 +- .../grpc/GrpcCompensateStreamObserver.java | 3 +- ...rver.java => GrpcCoordinateStreamObserver.java} | 37 +-- .../omega/connector/grpc/GrpcTccEventService.java | 156 +++++++++++++ .../connector/grpc/GrpcTccEventServiceTest.java | 260 +++++++++++++++++++++ pom.xml | 5 + 6 files changed, 440 insertions(+), 27 deletions(-) diff --git a/omega/omega-connector/omega-connector-grpc/pom.xml b/omega/omega-connector/omega-connector-grpc/pom.xml index 7a14097..f4d7bf1 100644 --- a/omega/omega-connector/omega-connector-grpc/pom.xml +++ b/omega/omega-connector/omega-connector-grpc/pom.xml @@ -18,7 +18,6 @@ ~ ~ --> - <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> @@ -59,6 +58,11 @@ </dependency> <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-testing</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <scope>test</scope> diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java index 9d9c312..6aae96a 100644 --- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java +++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java @@ -59,11 +59,12 @@ class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateComma @Override public void onError(Throwable t) { - LOG.error("failed to process grpc compensate command.", t); + LOG.error("Failed to process grpc compensate command.", t); errorHandler.run(); } @Override public void onCompleted() { + // Do nothing here } } diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCoordinateStreamObserver.java similarity index 54% copy from omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java copy to omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCoordinateStreamObserver.java index 9d9c312..20f5974 100644 --- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java +++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCoordinateStreamObserver.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -14,56 +13,44 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * - * */ package org.apache.servicecomb.saga.omega.connector.grpc; import java.lang.invoke.MethodHandles; -import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer; -import org.apache.servicecomb.saga.omega.transaction.MessageHandler; -import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand; +import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.grpc.stub.StreamObserver; -class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> { +public class GrpcCoordinateStreamObserver implements StreamObserver<GrpcTccCoordinateCommand> { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final MessageHandler messageHandler; - private final Runnable errorHandler; - private final MessageDeserializer deserializer; + - GrpcCompensateStreamObserver(MessageHandler messageHandler, Runnable errorHandler, MessageDeserializer deserializer) { + public GrpcCoordinateStreamObserver(MessageHandler messageHandler) { this.messageHandler = messageHandler; - this.errorHandler = errorHandler; - this.deserializer = deserializer; } @Override - public void onNext(GrpcCompensateCommand command) { - LOG.info("Received compensate command, global tx id: {}, local tx id: {}, compensation method: {}", - command.getGlobalTxId(), command.getLocalTxId(), command.getCompensationMethod()); - - messageHandler.onReceive( - command.getGlobalTxId(), - command.getLocalTxId(), - command.getParentTxId().isEmpty() ? null : command.getParentTxId(), - command.getCompensationMethod(), - deserializer.deserialize(command.getPayloads().toByteArray())); + public void onNext(GrpcTccCoordinateCommand command) { + LOG.info("Received coordinate command, global tx id: {}, local tx id: {}, call method: {}", + command.getGlobalTxId(), command.getLocalTxId(), command.getMethod()); + messageHandler.onReceive(command.getGlobalTxId(), command.getLocalTxId(), command.getParentTxId(), command.getMethod()); } @Override public void onError(Throwable t) { - LOG.error("failed to process grpc compensate command.", t); - errorHandler.run(); + //TODO need to find a way to handle the error + LOG.error("Failed to process grpc coordinate command.", t); } @Override public void onCompleted() { + // Do nothing here } } diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java new file mode 100644 index 0000000..cfcb945 --- /dev/null +++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.omega.connector.grpc; + +import org.apache.servicecomb.saga.omega.context.ServiceConfig; +import org.apache.servicecomb.saga.omega.transaction.AlphaResponse; +import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler; +import org.apache.servicecomb.saga.omega.transaction.tcc.TccEventService; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent; + +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinatedEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc; +import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub; +import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub; + +import io.grpc.ManagedChannel; + +public class GrpcTccEventService implements TccEventService { + private final GrpcServiceConfig serviceConfig; + private final String target; + private final TccEventServiceBlockingStub tccBlockingEventService; + private final TccEventServiceStub tccAsyncEventService; + private final GrpcCoordinateStreamObserver observer; + + public GrpcTccEventService(ServiceConfig serviceConfig, + ManagedChannel channel, + String address, + MessageHandler handler + ) { + this.target = address; + tccBlockingEventService = TccEventServiceGrpc.newBlockingStub(channel); + tccAsyncEventService = TccEventServiceGrpc.newStub(channel); + this.serviceConfig = serviceConfig(serviceConfig.serviceName(), serviceConfig.instanceId()); + observer = new GrpcCoordinateStreamObserver(handler); + } + + @Override + public void onConnected() { + tccAsyncEventService.onConnected(serviceConfig, observer); + } + + @Override + public void onDisconnected() { + tccBlockingEventService.onDisconnected(serviceConfig); + } + + @Override + public void close() { + // do nothing here + } + + @Override + public String target() { + return target; + } + + @Override + public AlphaResponse participate(ParticipatedEvent participateEvent) { + GrpcAck grpcAck = tccBlockingEventService.participate(convertTo(participateEvent)); + return new AlphaResponse(grpcAck.getAborted()); + } + + @Override + public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) { + GrpcAck grpcAck = tccBlockingEventService.onTccTransactionStarted(convertTo(tccStartEvent)); + return new AlphaResponse(grpcAck.getAborted()); + } + + + @Override + public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) { + GrpcAck grpcAck = tccBlockingEventService.onTccTransactionEnded(convertTo(tccEndEvent)); + return new AlphaResponse(grpcAck.getAborted()); + + } + + @Override + public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) { + GrpcAck grpcAck = tccBlockingEventService.onTccCoordinated(convertTo(coordinatedEvent)); + return new AlphaResponse(grpcAck.getAborted()); + } + + private GrpcTccCoordinatedEvent convertTo(CoordinatedEvent coordinatedEvent) { + return GrpcTccCoordinatedEvent.newBuilder() + .setServiceName(serviceConfig.getServiceName()) + .setInstanceId(serviceConfig.getInstanceId()) + .setGlobalTxId(coordinatedEvent.getGlobalTxId()) + .setLocalTxId(coordinatedEvent.getLocalTxId()) + .setParentTxId(coordinatedEvent.getParentTxId()) + .setMethodName(coordinatedEvent.getMethodName()) + .setStatus(coordinatedEvent.getStatus().toString()) + .build(); + } + + private GrpcServiceConfig serviceConfig(String serviceName, String instanceId) { + return GrpcServiceConfig.newBuilder() + .setServiceName(serviceName) + .setInstanceId(instanceId) + .build(); + } + + private GrpcTccTransactionStartedEvent convertTo(TccStartedEvent tccStartEvent) { + return GrpcTccTransactionStartedEvent.newBuilder() + .setServiceName(serviceConfig.getServiceName()) + .setInstanceId(serviceConfig.getInstanceId()) + .setGlobalTxId(tccStartEvent.getGlobalTxId()) + .setLocalTxId(tccStartEvent.getLocalTxId()) + .build(); + } + + private GrpcTccTransactionEndedEvent convertTo(TccEndedEvent tccEndEvent) { + return GrpcTccTransactionEndedEvent.newBuilder() + .setServiceName(serviceConfig.getServiceName()) + .setInstanceId(serviceConfig.getInstanceId()) + .setGlobalTxId(tccEndEvent.getGlobalTxId()) + .setLocalTxId(tccEndEvent.getLocalTxId()) + .setStatus(tccEndEvent.getStatus().toString()) + .build(); + } + + private GrpcTccParticipatedEvent convertTo(ParticipatedEvent participateEvent) { + return GrpcTccParticipatedEvent.newBuilder() + .setServiceName(serviceConfig.getServiceName()) + .setInstanceId(serviceConfig.getInstanceId()) + .setGlobalTxId(participateEvent.getGlobalTxId()) + .setLocalTxId(participateEvent.getLocalTxId()) + .setParentTxId(participateEvent.getParentTxId()) + .setCancelMethod(participateEvent.getCancelMethod()) + .setConfirmMethod(participateEvent.getConfirmMethod()) + .setStatus(participateEvent.getStatus().toString()) + .build(); + } +} diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java new file mode 100644 index 0000000..b4cfef2 --- /dev/null +++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.omega.connector.grpc; + +import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import org.apache.servicecomb.saga.common.TransactionStatus; +import org.apache.servicecomb.saga.omega.context.ServiceConfig; +import org.apache.servicecomb.saga.omega.transaction.AlphaResponse; +import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinatedEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceImplBase; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import io.grpc.ManagedChannel; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.util.MutableHandlerRegistry; + +@RunWith(JUnit4.class) +public class GrpcTccEventServiceTest { + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); + private final GrpcAck ack = GrpcAck.newBuilder().setAborted(false).build(); + + private final String globalTxId = uniquify("globalTxId"); + private final String localTxId = uniquify("localTxId"); + private final String parentTxId = uniquify("parentTxId"); + private final String methodName = uniquify("methodName"); + private final String confirmMethod = uniquify("confirmMethod"); + private final String cancelMethod = uniquify("cancleMethod"); + private final String serviceName = uniquify("serviceName"); + + private final ServiceConfig serviceConfig = new ServiceConfig(uniquify("Service")); + private final String address = uniquify("Address"); + private final MessageHandler handler = mock(MessageHandler.class); + private GrpcTccEventService service; + + @Before + public void setUp() throws Exception { + // Generate a unique in-process server name. + String serverName = InProcessServerBuilder.generateName(); + + // Create a server, add service, start, and register for automatic graceful shutdown. + grpcCleanup.register(InProcessServerBuilder.forName(serverName). + fallbackHandlerRegistry(serviceRegistry).directExecutor().build().start()); + + // Create a client channel and register for automatic graceful shutdown. + ManagedChannel channel = grpcCleanup.register( + InProcessChannelBuilder.forName(serverName).directExecutor().build()); + + // Create a TccEventServiceStub using the in-process channel; + service = new GrpcTccEventService(serviceConfig, channel, address, handler); + } + + @Test + public void serviceOnDisconnectedTest() { + + final GrpcServiceConfig[] requestCaptor = new GrpcServiceConfig[1]; + + TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() { + + public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) { + requestCaptor[0] = request; + responseObserver.onNext(ack); + responseObserver.onCompleted(); + } + }; + + serviceRegistry.addService(serviceImpl); + service.onDisconnected(); + + assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName())); + assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId())); + } + + + @Test + public void serviceOnConnectedTest() { + final GrpcTccCoordinateCommand coordinateCommand = + GrpcTccCoordinateCommand.newBuilder() + .setGlobalTxId(globalTxId) + .setLocalTxId(localTxId) + .setParentTxId(parentTxId) + .setMethod(methodName) + .setServiceName(serviceName) + .build(); + + final GrpcServiceConfig[] requestCaptor = new GrpcServiceConfig[1]; + + TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() { + @Override + public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcTccCoordinateCommand> responseObserver) { + requestCaptor[0] = request; + // Just send the coordinateCommand back + responseObserver.onNext(coordinateCommand); + responseObserver.onCompleted(); + } + }; + + serviceRegistry.addService(serviceImpl); + service.onConnected(); + + assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName())); + assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId())); + + verify(handler).onReceive(globalTxId, localTxId,parentTxId, methodName); + + } + + @Test + public void serviceOnTransactionStartTest() { + + final GrpcTccTransactionStartedEvent[] requestCaptor = new GrpcTccTransactionStartedEvent[1]; + TccStartedEvent event = new TccStartedEvent(globalTxId,localTxId); + + TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() { + + public void onTccTransactionStarted(GrpcTccTransactionStartedEvent request, + io.grpc.stub.StreamObserver<GrpcAck> responseObserver) { + requestCaptor[0] = request; + responseObserver.onNext(ack); + responseObserver.onCompleted(); + } + }; + + serviceRegistry.addService(serviceImpl); + AlphaResponse response =service.tccTransactionStart(event); + + assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName())); + assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId())); + assertThat(requestCaptor[0].getGlobalTxId(), is(globalTxId)); + assertThat(requestCaptor[0].getLocalTxId(), is(localTxId)); + assertThat(response.aborted(), is(false)); + } + + @Test + public void serviceOnTransactionEndTest() { + + final GrpcTccTransactionEndedEvent[] requestCaptor = new GrpcTccTransactionEndedEvent[1]; + TccEndedEvent event = new TccEndedEvent(globalTxId,localTxId, TransactionStatus.Failed); + + TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() { + + public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request, + io.grpc.stub.StreamObserver<GrpcAck> responseObserver) { + requestCaptor[0] = request; + responseObserver.onNext(ack); + responseObserver.onCompleted(); + } + }; + + serviceRegistry.addService(serviceImpl); + AlphaResponse response =service.tccTransactionStop(event); + + assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName())); + assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId())); + assertThat(requestCaptor[0].getGlobalTxId(), is(globalTxId)); + assertThat(requestCaptor[0].getLocalTxId(), is(localTxId)); + assertThat(requestCaptor[0].getStatus(), is(TransactionStatus.Failed.toString())); + assertThat(response.aborted(), is(false)); + } + + @Test + public void serviceOnParticipateTest() { + + final GrpcTccParticipatedEvent[] requestCaptor = new GrpcTccParticipatedEvent[1]; + ParticipatedEvent event = new ParticipatedEvent(globalTxId,localTxId, parentTxId, confirmMethod, cancelMethod, TransactionStatus.Succeed); + + TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() { + + public void participate(GrpcTccParticipatedEvent request, + StreamObserver<GrpcAck> responseObserver) { + requestCaptor[0] = request; + responseObserver.onNext(ack); + responseObserver.onCompleted(); + } + }; + + serviceRegistry.addService(serviceImpl); + AlphaResponse response =service.participate(event); + + assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName())); + assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId())); + assertThat(requestCaptor[0].getGlobalTxId(), is(globalTxId)); + assertThat(requestCaptor[0].getLocalTxId(), is(localTxId)); + assertThat(requestCaptor[0].getParentTxId(), is(parentTxId)); + assertThat(requestCaptor[0].getCancelMethod(), is(cancelMethod)); + assertThat(requestCaptor[0].getConfirmMethod(), is(confirmMethod)); + assertThat(requestCaptor[0].getStatus(), is(TransactionStatus.Succeed.toString())); + assertThat(response.aborted(), is(false)); + } + + @Test + public void serviceOnCoordinateTest() { + + final GrpcTccCoordinatedEvent[] requestCaptor = new GrpcTccCoordinatedEvent[1]; + CoordinatedEvent event = new CoordinatedEvent(globalTxId,localTxId, parentTxId, methodName, TransactionStatus.Succeed); + + TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() { + + public void onTccCoordinated(GrpcTccCoordinatedEvent request, + io.grpc.stub.StreamObserver<GrpcAck> responseObserver) { + requestCaptor[0] = request; + responseObserver.onNext(ack); + responseObserver.onCompleted(); + } + }; + + serviceRegistry.addService(serviceImpl); + AlphaResponse response =service.coordinate(event); + + assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName())); + assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId())); + assertThat(requestCaptor[0].getGlobalTxId(), is(globalTxId)); + assertThat(requestCaptor[0].getLocalTxId(), is(localTxId)); + assertThat(requestCaptor[0].getMethodName(), is(methodName)); + assertThat(requestCaptor[0].getStatus(), is(TransactionStatus.Succeed.toString())); + assertThat(response.aborted(), is(false)); + } + + + +} diff --git a/pom.xml b/pom.xml index 83e5366..57ab449 100644 --- a/pom.xml +++ b/pom.xml @@ -503,6 +503,11 @@ <version>${grpc.version}</version> </dependency> <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-testing</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> <version>${kryo.version}</version>
